All checks were successful
Test / Create distribution (push) Successful in 35s
Test / Sandbox (push) Successful in 2m18s
Test / Hakurei (push) Successful in 3m17s
Test / Sandbox (race detector) (push) Successful in 4m7s
Test / Hpkg (push) Successful in 4m13s
Test / Hakurei (race detector) (push) Successful in 5m3s
Test / Flake checks (push) Successful in 1m40s
These packages are highly specific to hakurei and are difficult to use safely from other pieces of code. Their exported symbols are made available until v0.4.0 where they will be removed for #24. Signed-off-by: Ophestra <cat@gensokyo.uk>
208 lines
5.4 KiB
Go
208 lines
5.4 KiB
Go
package system
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
|
|
"hakurei.app/container"
|
|
"hakurei.app/hst"
|
|
"hakurei.app/internal/system/dbus"
|
|
)
|
|
|
|
// ErrDBusConfig is returned when a required [hst.BusConfig] argument is nil.
|
|
var ErrDBusConfig = errors.New("dbus config not supplied")
|
|
|
|
// MustProxyDBus calls ProxyDBus and panics if an error is returned.
|
|
func (sys *I) MustProxyDBus(
|
|
session, system *hst.BusConfig,
|
|
sessionBus, systemBus dbus.ProxyPair,
|
|
) *I {
|
|
if err := sys.ProxyDBus(session, system, sessionBus, systemBus); err != nil {
|
|
panic(err.Error())
|
|
} else {
|
|
return sys
|
|
}
|
|
}
|
|
|
|
// ProxyDBus finalises configuration ahead of time and starts xdg-dbus-proxy via [dbus] and terminates it on revert.
|
|
// This [Op] is always [Process] scoped.
|
|
func (sys *I) ProxyDBus(
|
|
session, system *hst.BusConfig,
|
|
sessionBus, systemBus dbus.ProxyPair,
|
|
) error {
|
|
d := new(dbusProxyOp)
|
|
|
|
// session bus is required as otherwise this is effectively a very expensive noop
|
|
if session == nil {
|
|
return newOpErrorMessage("dbus", ErrDBusConfig,
|
|
"attempted to create message bus proxy args without session bus config", false)
|
|
}
|
|
|
|
// system bus is optional
|
|
d.system = system != nil
|
|
|
|
d.out = &linePrefixWriter{println: log.Println, prefix: "(dbus) ", buf: new(strings.Builder)}
|
|
if final, err := sys.dbusFinalise(sessionBus, systemBus, session, system); err != nil {
|
|
if errors.Is(err, syscall.EINVAL) {
|
|
return newOpErrorMessage("dbus", err,
|
|
"message bus proxy configuration contains NUL byte", false)
|
|
}
|
|
return newOpErrorMessage("dbus", err,
|
|
fmt.Sprintf("cannot finalise message bus proxy: %v", err), false)
|
|
} else {
|
|
if sys.msg.IsVerbose() {
|
|
sys.msg.Verbose("session bus proxy:", dbus.Args(session, sessionBus))
|
|
if system != nil {
|
|
sys.msg.Verbose("system bus proxy:", dbus.Args(system, systemBus))
|
|
}
|
|
|
|
// this calls the argsWt String method
|
|
sys.msg.Verbose("message bus proxy final args:", final.WriterTo)
|
|
}
|
|
|
|
d.final = final
|
|
}
|
|
|
|
sys.ops = append(sys.ops, d)
|
|
return nil
|
|
}
|
|
|
|
// dbusProxyOp implements [I.ProxyDBus].
|
|
type dbusProxyOp struct {
|
|
proxy *dbus.Proxy // populated during apply
|
|
|
|
final *dbus.Final
|
|
out *linePrefixWriter
|
|
// whether system bus proxy is enabled
|
|
system bool
|
|
}
|
|
|
|
func (d *dbusProxyOp) Type() hst.Enablement { return Process }
|
|
|
|
func (d *dbusProxyOp) apply(sys *I) error {
|
|
sys.msg.Verbosef("session bus proxy on %q for upstream %q", d.final.Session[1], d.final.Session[0])
|
|
if d.system {
|
|
sys.msg.Verbosef("system bus proxy on %q for upstream %q", d.final.System[1], d.final.System[0])
|
|
}
|
|
|
|
d.proxy = dbus.New(sys.ctx, sys.msg, d.final, d.out)
|
|
if err := sys.dbusProxyStart(d.proxy); err != nil {
|
|
d.out.Dump()
|
|
return newOpErrorMessage("dbus", err,
|
|
fmt.Sprintf("cannot start message bus proxy: %v", err), false)
|
|
}
|
|
sys.msg.Verbose("starting message bus proxy", d.proxy)
|
|
return nil
|
|
}
|
|
|
|
func (d *dbusProxyOp) revert(sys *I, _ *Criteria) error {
|
|
// criteria ignored here since dbus is always process-scoped
|
|
sys.msg.Verbose("terminating message bus proxy")
|
|
sys.dbusProxyClose(d.proxy)
|
|
|
|
exitMessage := "message bus proxy exit"
|
|
defer func() { sys.msg.Verbose(exitMessage) }()
|
|
|
|
if d.out != nil {
|
|
d.out.Dump()
|
|
}
|
|
|
|
err := sys.dbusProxyWait(d.proxy)
|
|
if errors.Is(err, context.Canceled) {
|
|
exitMessage = "message bus proxy canceled upstream"
|
|
err = nil
|
|
}
|
|
return newOpErrorMessage("dbus", err,
|
|
fmt.Sprintf("message bus proxy error: %v", err), true)
|
|
}
|
|
|
|
func (d *dbusProxyOp) Is(o Op) bool {
|
|
target, ok := o.(*dbusProxyOp)
|
|
return ok && d != nil && target != nil &&
|
|
d.system == target.system &&
|
|
d.final != nil && target.final != nil &&
|
|
d.final.Session == target.final.Session &&
|
|
d.final.System == target.final.System &&
|
|
dbus.EqualAddrEntries(d.final.SessionUpstream, target.final.SessionUpstream) &&
|
|
dbus.EqualAddrEntries(d.final.SystemUpstream, target.final.SystemUpstream) &&
|
|
reflect.DeepEqual(d.final.WriterTo, target.final.WriterTo)
|
|
}
|
|
|
|
func (d *dbusProxyOp) Path() string { return container.Nonexistent }
|
|
func (d *dbusProxyOp) String() string { return d.proxy.String() }
|
|
|
|
const (
|
|
// lpwSizeThreshold is the threshold of bytes written to linePrefixWriter which,
|
|
// if reached or exceeded, causes linePrefixWriter to drop all future writes.
|
|
lpwSizeThreshold = 1 << 24
|
|
)
|
|
|
|
// linePrefixWriter calls println with a prefix for every line written.
|
|
type linePrefixWriter struct {
|
|
prefix string
|
|
println func(v ...any)
|
|
|
|
n int
|
|
msg []string
|
|
buf *strings.Builder
|
|
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (s *linePrefixWriter) Write(p []byte) (n int, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.write(p, 0)
|
|
}
|
|
|
|
func (s *linePrefixWriter) write(p []byte, a int) (int, error) {
|
|
if s.n >= lpwSizeThreshold {
|
|
if len(p) == 0 {
|
|
return a, nil
|
|
}
|
|
return a, syscall.ENOMEM
|
|
}
|
|
|
|
if i := bytes.IndexByte(p, '\n'); i == -1 {
|
|
n, _ := s.buf.Write(p)
|
|
s.n += n
|
|
return a + n, nil
|
|
} else {
|
|
n, _ := s.buf.Write(p[:i])
|
|
s.n += n + 1
|
|
|
|
v := s.buf.String()
|
|
if strings.HasPrefix(v, "init: ") {
|
|
s.n -= len(v) + 1
|
|
// pass through container init messages
|
|
s.println(s.prefix + v)
|
|
} else {
|
|
s.msg = append(s.msg, v)
|
|
}
|
|
|
|
s.buf.Reset()
|
|
return s.write(p[i+1:], a+n+1)
|
|
}
|
|
}
|
|
|
|
func (s *linePrefixWriter) Dump() {
|
|
s.mu.RLock()
|
|
for _, m := range s.msg {
|
|
s.println(s.prefix + m)
|
|
}
|
|
if s.buf != nil && s.buf.Len() != 0 {
|
|
s.println("*" + s.prefix + s.buf.String())
|
|
}
|
|
if s.n >= lpwSizeThreshold {
|
|
s.println("+" + s.prefix + "write threshold reached, output may be incomplete")
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|