From 9daba6080992f7a965873936dc20a85385bbc269 Mon Sep 17 00:00:00 2001 From: Ophestra Date: Fri, 17 Apr 2026 01:06:13 +0900 Subject: [PATCH] cmd/mbf: daemon command This services internal/pkg artifact IR with Rosa OS extensions originating from another process. Signed-off-by: Ophestra --- cmd/mbf/daemon.go | 151 +++++++++++++++++++++++++++++++++++++++++ cmd/mbf/daemon_test.go | 125 ++++++++++++++++++++++++++++++++++ cmd/mbf/main.go | 40 +++++++++++ 3 files changed, 316 insertions(+) create mode 100644 cmd/mbf/daemon.go create mode 100644 cmd/mbf/daemon_test.go diff --git a/cmd/mbf/daemon.go b/cmd/mbf/daemon.go new file mode 100644 index 00000000..22eeb72b --- /dev/null +++ b/cmd/mbf/daemon.go @@ -0,0 +1,151 @@ +package main + +import ( + "context" + "errors" + "io" + "log" + "net" + "os" + "sync" + "testing" + "time" + + "hakurei.app/check" + "hakurei.app/internal/pkg" +) + +// daemonTimeout is the maximum amount of time cureFromIR will wait on I/O. +const daemonTimeout = 30 * time.Second + +// daemonDeadline returns the deadline corresponding to daemonTimeout, or the +// zero value when running in a test. +func daemonDeadline() time.Time { + if testing.Testing() { + return time.Time{} + } + return time.Now().Add(daemonTimeout) +} + +// cureFromIR services an IR curing request. +func cureFromIR( + ctx context.Context, + cache *pkg.Cache, + conn net.Conn, +) (pkg.Artifact, error) { + go func() { + <-ctx.Done() + _ = conn.SetDeadline(time.Now()) + }() + + if err := conn.SetReadDeadline(daemonDeadline()); err != nil { + return nil, errors.Join(err, conn.Close()) + } + + a, decodeErr := cache.NewDecoder(conn).Decode() + if decodeErr != nil { + _, err := conn.Write([]byte("\x00" + decodeErr.Error())) + return nil, errors.Join(decodeErr, err, conn.Close()) + } + + pathname, _, cureErr := cache.Cure(a) + if err := conn.SetWriteDeadline(daemonDeadline()); err != nil { + if !testing.Testing() || !errors.Is(err, io.ErrClosedPipe) { + return a, errors.Join(err, conn.Close()) + } + } + if cureErr != nil { + _, err := conn.Write([]byte("\x00" + cureErr.Error())) + return a, errors.Join(cureErr, err, conn.Close()) + } + _, err := conn.Write([]byte(pathname.String())) + if testing.Testing() && errors.Is(err, io.ErrClosedPipe) { + return a, nil + } + return a, errors.Join(err, conn.Close()) +} + +// serve services connections from a [net.UnixListener]. +func serve(ctx context.Context, log *log.Logger, cm *cache, ul *net.UnixListener) error { + ul.SetUnlinkOnClose(true) + if cm.c == nil { + if err := cm.open(); err != nil { + return errors.Join(err, ul.Close()) + } + } + + var wg sync.WaitGroup + defer wg.Wait() + + wg.Go(func() { + for { + if ctx.Err() != nil { + break + } + + conn, err := ul.AcceptUnix() + if err != nil { + if !errors.Is(err, os.ErrDeadlineExceeded) { + log.Println(err) + } + continue + } + wg.Go(func() { + if a, _err := cureFromIR(ctx, cm.c, conn); _err != nil { + log.Println(_err) + } else { + log.Printf( + "fulfilled artifact %s", + pkg.Encode(cm.c.Ident(a).Value()), + ) + } + }) + } + }) + + <-ctx.Done() + if err := ul.SetDeadline(time.Now()); err != nil { + return errors.Join(err, ul.Close()) + } + wg.Wait() + return ul.Close() +} + +// cureRemote cures a [pkg.Artifact] on a daemon. +func cureRemote( + ctx context.Context, + addr *net.UnixAddr, + a pkg.Artifact, +) (*check.Absolute, error) { + conn, err := net.DialUnix("unix", nil, addr) + if err != nil { + return nil, err + } + + go func() { + <-ctx.Done() + _ = conn.SetDeadline(time.Now()) + }() + + if err = pkg.NewIR().EncodeAll(conn, a); err != nil { + return nil, errors.Join(err, conn.Close()) + } else if err = conn.CloseWrite(); err != nil { + return nil, errors.Join(err, conn.Close()) + } + + payload, recvErr := io.ReadAll(conn) + if err = errors.Join(recvErr, conn.Close()); err != nil { + if errors.Is(err, os.ErrDeadlineExceeded) { + err = ctx.Err() + } + return nil, err + } + + if len(payload) > 0 && payload[0] == 0 { + return nil, errors.New(string(payload[1:])) + } + + var p *check.Absolute + p, err = check.NewAbs(string(payload)) + return p, err +} diff --git a/cmd/mbf/daemon_test.go b/cmd/mbf/daemon_test.go new file mode 100644 index 00000000..cf67059a --- /dev/null +++ b/cmd/mbf/daemon_test.go @@ -0,0 +1,125 @@ +package main + +import ( + "bytes" + "context" + "errors" + "io" + "log" + "net" + "os" + "path/filepath" + "testing" + "time" + + "hakurei.app/check" + "hakurei.app/internal/pkg" + "hakurei.app/message" +) + +func TestCureFromIR(t *testing.T) { + t.Parallel() + if !daemonDeadline().IsZero() { + t.Fatal("daemonDeadline did not return the zero value") + } + + c, err := pkg.Open( + t.Context(), + message.New(log.New(os.Stderr, "cir: ", 0)), + 0, 0, 0, + check.MustAbs(t.TempDir()), + ) + if err != nil { + t.Fatalf("Open: error = %v", err) + } + defer c.Close() + + client, server := net.Pipe() + done := make(chan struct{}) + go func() { + defer close(done) + go func() { + <-t.Context().Done() + if _err := client.SetDeadline(time.Now()); _err != nil && !errors.Is(_err, io.ErrClosedPipe) { + panic(_err) + } + }() + + if _err := c.EncodeAll( + client, + pkg.NewFile("check", []byte{0}), + ); _err != nil { + panic(_err) + } else if _err = client.Close(); _err != nil { + panic(_err) + } + }() + + a, cureErr := cureFromIR(t.Context(), c, server) + if cureErr != nil { + t.Fatalf("cureFromIR: error = %v", cureErr) + } + + <-done + wantIdent := pkg.MustDecode("fiZf-ZY_Yq6qxJNrHbMiIPYCsGkUiKCRsZrcSELXTqZWtCnESlHmzV5ThhWWGGYG") + if gotIdent := c.Ident(a).Value(); gotIdent != wantIdent { + t.Errorf( + "cureFromIR: %s, want %s", + pkg.Encode(gotIdent), pkg.Encode(wantIdent), + ) + } +} + +func TestDaemon(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + logger := log.New(&buf, "daemon: ", 0) + + addr := net.UnixAddr{ + Name: filepath.Join(t.TempDir(), "daemon"), + Net: "unix", + } + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + cm := cache{ + ctx: ctx, + msg: message.New(logger), + base: t.TempDir(), + } + defer cm.Close() + + ul, err := net.ListenUnix("unix", &addr) + if err != nil { + t.Fatalf("ListenUnix: error = %v", err) + } + + done := make(chan struct{}) + go func() { + defer close(done) + if _err := serve(ctx, logger, &cm, ul); _err != nil { + panic(_err) + } + }() + + var p *check.Absolute + p, err = cureRemote(ctx, &addr, pkg.NewFile("check", []byte{0})) + if err != nil { + t.Fatalf("cureRemote: error = %v", err) + } + cancel() + <-done + + const want = "fiZf-ZY_Yq6qxJNrHbMiIPYCsGkUiKCRsZrcSELXTqZWtCnESlHmzV5ThhWWGGYG" + if got := filepath.Base(p.String()); got != want { + t.Errorf("cureRemote: %s, want %s", got, want) + } + + const wantLog = `daemon: fulfilled artifact fiZf-ZY_Yq6qxJNrHbMiIPYCsGkUiKCRsZrcSELXTqZWtCnESlHmzV5ThhWWGGYG +` + if gotLog := buf.String(); gotLog != wantLog { + t.Errorf("serve: logged\n%s\nwant\n%s", gotLog, wantLog) + } +} diff --git a/cmd/mbf/main.go b/cmd/mbf/main.go index 1d260fe8..894f5a46 100644 --- a/cmd/mbf/main.go +++ b/cmd/mbf/main.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "log" + "net" "os" "os/signal" "path/filepath" @@ -69,11 +70,20 @@ func main() { var ( flagQuiet bool + + addr net.UnixAddr ) c := command.New(os.Stderr, log.Printf, "mbf", func([]string) error { msg.SwapVerbose(!flagQuiet) cm.ctx, cm.msg = ctx, msg cm.base = os.ExpandEnv(cm.base) + + addr.Net = "unix" + addr.Name = os.ExpandEnv(addr.Name) + if addr.Name == "" { + addr.Name = "daemon" + } + return nil }).Flag( &flagQuiet, @@ -102,6 +112,10 @@ func main() { ), "Do not restrict networked cure containers from connecting to host "+ "abstract UNIX sockets", + ).Flag( + &addr.Name, + "socket", command.StringFlag("$MBF_DAEMON_SOCKET"), + "Pathname of socket to bind to", ) c.NewCommand( @@ -267,6 +281,19 @@ func main() { ) } + c.NewCommand( + "daemon", + "Service artifact IR with Rosa OS extensions", + func(args []string) error { + ul, err := net.ListenUnix("unix", &addr) + if err != nil { + return err + } + log.Printf("listening on pathname socket at %s", addr.Name) + return serve(ctx, log.Default(), &cm, ul) + }, + ) + { var ( flagGentoo string @@ -374,6 +401,7 @@ func main() { flagDump string flagEnter bool flagExport string + flagRemote bool ) c.NewCommand( "cure", @@ -450,6 +478,13 @@ func main() { "sh", ) }) + + case flagRemote: + pathname, err := cureRemote(ctx, &addr, rosa.Std.Load(p)) + if err == nil { + log.Println(pathname) + } + return err } }, ). @@ -467,6 +502,11 @@ func main() { &flagEnter, "enter", command.BoolFlag(false), "Enter cure container with an interactive shell", + ). + Flag( + &flagRemote, + "daemon", command.BoolFlag(false), + "Cure artifact on the daemon", ) }