hakurei/system/dbus.go
Ophestra e68db7fbfc
All checks were successful
Test / Create distribution (push) Successful in 52s
Test / Sandbox (push) Successful in 3m30s
Test / Hakurei (push) Successful in 5m40s
Test / Sandbox (race detector) (push) Successful in 6m30s
Test / Hpkg (push) Successful in 7m21s
Test / Hakurei (race detector) (push) Successful in 3m22s
Test / Flake checks (push) Successful in 2m2s
system: unexport Op implementations
None of these are valid with their zero value, and the implementations assume they are created by the builder methods. They are by all means an implementation detail and exporting them makes no sense.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-09-06 16:16:03 +09:00

201 lines
5.4 KiB
Go

package system
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"reflect"
"strings"
"sync"
"syscall"
"hakurei.app/container"
"hakurei.app/system/dbus"
)
var (
ErrDBusConfig = errors.New("dbus config not supplied")
)
// MustProxyDBus calls ProxyDBus and panics if an error is returned.
func (sys *I) MustProxyDBus(sessionPath string, session *dbus.Config, systemPath string, system *dbus.Config) *I {
if _, err := sys.ProxyDBus(session, system, sessionPath, systemPath); err != nil {
panic(err.Error())
} else {
return sys
}
}
// ProxyDBus finalises configuration ahead of time and starts xdg-dbus-proxy via [dbus] and terminates it on revert.
// This [Op] is always [Process] scoped.
func (sys *I) ProxyDBus(session, system *dbus.Config, sessionPath, systemPath string) (func(), error) {
d := new(dbusProxyOp)
// session bus is required as otherwise this is effectively a very expensive noop
if session == nil {
return nil, newOpErrorMessage("dbus", ErrDBusConfig,
"attempted to create message bus proxy args without session bus config", false)
}
// system bus is optional
d.system = system != nil
var sessionBus, systemBus dbus.ProxyPair
sessionBus[0], systemBus[0] = sys.dbusAddress()
sessionBus[1], systemBus[1] = sessionPath, systemPath
d.out = &linePrefixWriter{println: log.Println, prefix: "(dbus) ", buf: new(strings.Builder)}
if final, err := sys.dbusFinalise(sessionBus, systemBus, session, system); err != nil {
if errors.Is(err, syscall.EINVAL) {
return nil, newOpErrorMessage("dbus", err,
"message bus proxy configuration contains NUL byte", false)
}
return nil, newOpErrorMessage("dbus", err,
fmt.Sprintf("cannot finalise message bus proxy: %v", err), false)
} else {
if sys.isVerbose() {
sys.verbose("session bus proxy:", session.Args(sessionBus))
if system != nil {
sys.verbose("system bus proxy:", system.Args(systemBus))
}
// this calls the argsWt String method
sys.verbose("message bus proxy final args:", final.WriterTo)
}
d.final = final
}
sys.ops = append(sys.ops, d)
return d.out.Dump, nil
}
// dbusProxyOp implements [I.ProxyDBus].
type dbusProxyOp struct {
proxy *dbus.Proxy // populated during apply
final *dbus.Final
out *linePrefixWriter
// whether system bus proxy is enabled
system bool
}
func (d *dbusProxyOp) Type() Enablement { return Process }
func (d *dbusProxyOp) apply(sys *I) error {
sys.verbosef("session bus proxy on %q for upstream %q", d.final.Session[1], d.final.Session[0])
if d.system {
sys.verbosef("system bus proxy on %q for upstream %q", d.final.System[1], d.final.System[0])
}
d.proxy = dbus.New(sys.ctx, d.final, d.out)
if err := sys.dbusProxyStart(d.proxy); err != nil {
d.out.Dump()
return newOpErrorMessage("dbus", err,
fmt.Sprintf("cannot start message bus proxy: %v", err), false)
}
sys.verbose("starting message bus proxy", d.proxy)
return nil
}
func (d *dbusProxyOp) revert(sys *I, _ *Criteria) error {
// criteria ignored here since dbus is always process-scoped
sys.verbose("terminating message bus proxy")
sys.dbusProxyClose(d.proxy)
exitMessage := "message bus proxy exit"
defer func() { sys.verbose(exitMessage) }()
err := sys.dbusProxyWait(d.proxy)
if errors.Is(err, context.Canceled) {
exitMessage = "message bus proxy canceled upstream"
err = nil
}
return newOpErrorMessage("dbus", err,
fmt.Sprintf("message bus proxy error: %v", err), true)
}
func (d *dbusProxyOp) Is(o Op) bool {
target, ok := o.(*dbusProxyOp)
return ok && d != nil && target != nil &&
d.system == target.system &&
d.final != nil && target.final != nil &&
d.final.Session == target.final.Session &&
d.final.System == target.final.System &&
dbus.EqualAddrEntries(d.final.SessionUpstream, target.final.SessionUpstream) &&
dbus.EqualAddrEntries(d.final.SystemUpstream, target.final.SystemUpstream) &&
reflect.DeepEqual(d.final.WriterTo, target.final.WriterTo)
}
func (d *dbusProxyOp) Path() string { return container.Nonexistent }
func (d *dbusProxyOp) String() string { return d.proxy.String() }
const (
// lpwSizeThreshold is the threshold of bytes written to linePrefixWriter which,
// if reached or exceeded, causes linePrefixWriter to drop all future writes.
lpwSizeThreshold = 1 << 24
)
// linePrefixWriter calls println with a prefix for every line written.
type linePrefixWriter struct {
prefix string
println func(v ...any)
n int
msg []string
buf *strings.Builder
mu sync.RWMutex
}
func (s *linePrefixWriter) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.write(p, 0)
}
func (s *linePrefixWriter) write(p []byte, a int) (int, error) {
if s.n >= lpwSizeThreshold {
if len(p) == 0 {
return a, nil
}
return a, syscall.ENOMEM
}
if i := bytes.IndexByte(p, '\n'); i == -1 {
n, _ := s.buf.Write(p)
s.n += n
return a + n, nil
} else {
n, _ := s.buf.Write(p[:i])
s.n += n + 1
v := s.buf.String()
if strings.HasPrefix(v, "init: ") {
s.n -= len(v) + 1
// pass through container init messages
s.println(s.prefix + v)
} else {
s.msg = append(s.msg, v)
}
s.buf.Reset()
return s.write(p[i+1:], a+n+1)
}
}
func (s *linePrefixWriter) Dump() {
s.mu.RLock()
for _, m := range s.msg {
s.println(s.prefix + m)
}
if s.buf != nil && s.buf.Len() != 0 {
s.println("*" + s.prefix + s.buf.String())
}
if s.n >= lpwSizeThreshold {
s.println("+" + s.prefix + "write threshold reached, output may be incomplete")
}
s.mu.RUnlock()
}