1
0
forked from rosa/hakurei

cmd/mbf: daemon command

This services internal/pkg artifact IR with Rosa OS extensions originating from another process.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2026-04-17 01:06:13 +09:00
parent bcd79a22ff
commit 9daba60809
3 changed files with 316 additions and 0 deletions

151
cmd/mbf/daemon.go Normal file
View File

@@ -0,0 +1,151 @@
package main
import (
"context"
"errors"
"io"
"log"
"net"
"os"
"sync"
"testing"
"time"
"hakurei.app/check"
"hakurei.app/internal/pkg"
)
// daemonTimeout is the maximum amount of time cureFromIR will wait on I/O.
const daemonTimeout = 30 * time.Second
// daemonDeadline returns the deadline corresponding to daemonTimeout, or the
// zero value when running in a test.
func daemonDeadline() time.Time {
if testing.Testing() {
return time.Time{}
}
return time.Now().Add(daemonTimeout)
}
// cureFromIR services an IR curing request.
func cureFromIR(
ctx context.Context,
cache *pkg.Cache,
conn net.Conn,
) (pkg.Artifact, error) {
go func() {
<-ctx.Done()
_ = conn.SetDeadline(time.Now())
}()
if err := conn.SetReadDeadline(daemonDeadline()); err != nil {
return nil, errors.Join(err, conn.Close())
}
a, decodeErr := cache.NewDecoder(conn).Decode()
if decodeErr != nil {
_, err := conn.Write([]byte("\x00" + decodeErr.Error()))
return nil, errors.Join(decodeErr, err, conn.Close())
}
pathname, _, cureErr := cache.Cure(a)
if err := conn.SetWriteDeadline(daemonDeadline()); err != nil {
if !testing.Testing() || !errors.Is(err, io.ErrClosedPipe) {
return a, errors.Join(err, conn.Close())
}
}
if cureErr != nil {
_, err := conn.Write([]byte("\x00" + cureErr.Error()))
return a, errors.Join(cureErr, err, conn.Close())
}
_, err := conn.Write([]byte(pathname.String()))
if testing.Testing() && errors.Is(err, io.ErrClosedPipe) {
return a, nil
}
return a, errors.Join(err, conn.Close())
}
// serve services connections from a [net.UnixListener].
func serve(ctx context.Context, log *log.Logger, cm *cache, ul *net.UnixListener) error {
ul.SetUnlinkOnClose(true)
if cm.c == nil {
if err := cm.open(); err != nil {
return errors.Join(err, ul.Close())
}
}
var wg sync.WaitGroup
defer wg.Wait()
wg.Go(func() {
for {
if ctx.Err() != nil {
break
}
conn, err := ul.AcceptUnix()
if err != nil {
if !errors.Is(err, os.ErrDeadlineExceeded) {
log.Println(err)
}
continue
}
wg.Go(func() {
if a, _err := cureFromIR(ctx, cm.c, conn); _err != nil {
log.Println(_err)
} else {
log.Printf(
"fulfilled artifact %s",
pkg.Encode(cm.c.Ident(a).Value()),
)
}
})
}
})
<-ctx.Done()
if err := ul.SetDeadline(time.Now()); err != nil {
return errors.Join(err, ul.Close())
}
wg.Wait()
return ul.Close()
}
// cureRemote cures a [pkg.Artifact] on a daemon.
func cureRemote(
ctx context.Context,
addr *net.UnixAddr,
a pkg.Artifact,
) (*check.Absolute, error) {
conn, err := net.DialUnix("unix", nil, addr)
if err != nil {
return nil, err
}
go func() {
<-ctx.Done()
_ = conn.SetDeadline(time.Now())
}()
if err = pkg.NewIR().EncodeAll(conn, a); err != nil {
return nil, errors.Join(err, conn.Close())
} else if err = conn.CloseWrite(); err != nil {
return nil, errors.Join(err, conn.Close())
}
payload, recvErr := io.ReadAll(conn)
if err = errors.Join(recvErr, conn.Close()); err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
err = ctx.Err()
}
return nil, err
}
if len(payload) > 0 && payload[0] == 0 {
return nil, errors.New(string(payload[1:]))
}
var p *check.Absolute
p, err = check.NewAbs(string(payload))
return p, err
}

125
cmd/mbf/daemon_test.go Normal file
View File

@@ -0,0 +1,125 @@
package main
import (
"bytes"
"context"
"errors"
"io"
"log"
"net"
"os"
"path/filepath"
"testing"
"time"
"hakurei.app/check"
"hakurei.app/internal/pkg"
"hakurei.app/message"
)
func TestCureFromIR(t *testing.T) {
t.Parallel()
if !daemonDeadline().IsZero() {
t.Fatal("daemonDeadline did not return the zero value")
}
c, err := pkg.Open(
t.Context(),
message.New(log.New(os.Stderr, "cir: ", 0)),
0, 0, 0,
check.MustAbs(t.TempDir()),
)
if err != nil {
t.Fatalf("Open: error = %v", err)
}
defer c.Close()
client, server := net.Pipe()
done := make(chan struct{})
go func() {
defer close(done)
go func() {
<-t.Context().Done()
if _err := client.SetDeadline(time.Now()); _err != nil && !errors.Is(_err, io.ErrClosedPipe) {
panic(_err)
}
}()
if _err := c.EncodeAll(
client,
pkg.NewFile("check", []byte{0}),
); _err != nil {
panic(_err)
} else if _err = client.Close(); _err != nil {
panic(_err)
}
}()
a, cureErr := cureFromIR(t.Context(), c, server)
if cureErr != nil {
t.Fatalf("cureFromIR: error = %v", cureErr)
}
<-done
wantIdent := pkg.MustDecode("fiZf-ZY_Yq6qxJNrHbMiIPYCsGkUiKCRsZrcSELXTqZWtCnESlHmzV5ThhWWGGYG")
if gotIdent := c.Ident(a).Value(); gotIdent != wantIdent {
t.Errorf(
"cureFromIR: %s, want %s",
pkg.Encode(gotIdent), pkg.Encode(wantIdent),
)
}
}
func TestDaemon(t *testing.T) {
t.Parallel()
var buf bytes.Buffer
logger := log.New(&buf, "daemon: ", 0)
addr := net.UnixAddr{
Name: filepath.Join(t.TempDir(), "daemon"),
Net: "unix",
}
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
cm := cache{
ctx: ctx,
msg: message.New(logger),
base: t.TempDir(),
}
defer cm.Close()
ul, err := net.ListenUnix("unix", &addr)
if err != nil {
t.Fatalf("ListenUnix: error = %v", err)
}
done := make(chan struct{})
go func() {
defer close(done)
if _err := serve(ctx, logger, &cm, ul); _err != nil {
panic(_err)
}
}()
var p *check.Absolute
p, err = cureRemote(ctx, &addr, pkg.NewFile("check", []byte{0}))
if err != nil {
t.Fatalf("cureRemote: error = %v", err)
}
cancel()
<-done
const want = "fiZf-ZY_Yq6qxJNrHbMiIPYCsGkUiKCRsZrcSELXTqZWtCnESlHmzV5ThhWWGGYG"
if got := filepath.Base(p.String()); got != want {
t.Errorf("cureRemote: %s, want %s", got, want)
}
const wantLog = `daemon: fulfilled artifact fiZf-ZY_Yq6qxJNrHbMiIPYCsGkUiKCRsZrcSELXTqZWtCnESlHmzV5ThhWWGGYG
`
if gotLog := buf.String(); gotLog != wantLog {
t.Errorf("serve: logged\n%s\nwant\n%s", gotLog, wantLog)
}
}

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"path/filepath"
@@ -69,11 +70,20 @@ func main() {
var (
flagQuiet bool
addr net.UnixAddr
)
c := command.New(os.Stderr, log.Printf, "mbf", func([]string) error {
msg.SwapVerbose(!flagQuiet)
cm.ctx, cm.msg = ctx, msg
cm.base = os.ExpandEnv(cm.base)
addr.Net = "unix"
addr.Name = os.ExpandEnv(addr.Name)
if addr.Name == "" {
addr.Name = "daemon"
}
return nil
}).Flag(
&flagQuiet,
@@ -102,6 +112,10 @@ func main() {
),
"Do not restrict networked cure containers from connecting to host "+
"abstract UNIX sockets",
).Flag(
&addr.Name,
"socket", command.StringFlag("$MBF_DAEMON_SOCKET"),
"Pathname of socket to bind to",
)
c.NewCommand(
@@ -267,6 +281,19 @@ func main() {
)
}
c.NewCommand(
"daemon",
"Service artifact IR with Rosa OS extensions",
func(args []string) error {
ul, err := net.ListenUnix("unix", &addr)
if err != nil {
return err
}
log.Printf("listening on pathname socket at %s", addr.Name)
return serve(ctx, log.Default(), &cm, ul)
},
)
{
var (
flagGentoo string
@@ -374,6 +401,7 @@ func main() {
flagDump string
flagEnter bool
flagExport string
flagRemote bool
)
c.NewCommand(
"cure",
@@ -450,6 +478,13 @@ func main() {
"sh",
)
})
case flagRemote:
pathname, err := cureRemote(ctx, &addr, rosa.Std.Load(p))
if err == nil {
log.Println(pathname)
}
return err
}
},
).
@@ -467,6 +502,11 @@ func main() {
&flagEnter,
"enter", command.BoolFlag(false),
"Enter cure container with an interactive shell",
).
Flag(
&flagRemote,
"daemon", command.BoolFlag(false),
"Cure artifact on the daemon",
)
}