cmd/mbf: optionally ignore reply
All checks were successful
Test / Create distribution (push) Successful in 1m3s
Test / Sandbox (push) Successful in 2m57s
Test / Hakurei (push) Successful in 3m52s
Test / ShareFS (push) Successful in 4m58s
Test / Sandbox (race detector) (push) Successful in 5m17s
Test / Hakurei (race detector) (push) Successful in 6m29s
Test / Flake checks (push) Successful in 1m47s

An acknowledgement is not always required in this use case. This change also adds 64 bits of connection configuration for future expansion.

Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
2026-04-17 16:46:49 +09:00
parent a394971dd7
commit 9036986156
3 changed files with 57 additions and 22 deletions

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/binary"
"errors"
"io"
"log"
@@ -27,21 +28,28 @@ func daemonDeadline() time.Time {
return time.Now().Add(daemonTimeout)
}
const (
// remoteNoReply notifies that the client will not receive a cure reply.
remoteNoReply = 1 << iota
)
// cureFromIR services an IR curing request.
func cureFromIR(
ctx context.Context,
cache *pkg.Cache,
conn net.Conn,
) (pkg.Artifact, error) {
go func() {
<-ctx.Done()
_ = conn.SetDeadline(time.Now())
}()
go func() { <-ctx.Done(); _ = conn.SetDeadline(time.Now()) }()
if err := conn.SetReadDeadline(daemonDeadline()); err != nil {
return nil, errors.Join(err, conn.Close())
}
var flagsWire [8]byte
if _, err := io.ReadFull(conn, flagsWire[:]); err != nil {
return nil, errors.Join(err, conn.Close())
}
flags := binary.LittleEndian.Uint64(flagsWire[:])
a, decodeErr := cache.NewDecoder(conn).Decode()
if decodeErr != nil {
_, err := conn.Write([]byte("\x00" + decodeErr.Error()))
@@ -49,10 +57,11 @@ func cureFromIR(
}
pathname, _, cureErr := cache.Cure(a)
if flags&remoteNoReply != 0 {
return a, errors.Join(cureErr, conn.Close())
}
if err := conn.SetWriteDeadline(daemonDeadline()); err != nil {
if !testing.Testing() || !errors.Is(err, io.ErrClosedPipe) {
return a, errors.Join(err, conn.Close())
}
return a, errors.Join(cureErr, err, conn.Close())
}
if cureErr != nil {
_, err := conn.Write([]byte("\x00" + cureErr.Error()))
@@ -66,7 +75,12 @@ func cureFromIR(
}
// serve services connections from a [net.UnixListener].
func serve(ctx context.Context, log *log.Logger, cm *cache, ul *net.UnixListener) error {
func serve(
ctx context.Context,
log *log.Logger,
cm *cache,
ul *net.UnixListener,
) error {
ul.SetUnlinkOnClose(true)
if cm.c == nil {
if err := cm.open(); err != nil {
@@ -116,16 +130,19 @@ func cureRemote(
ctx context.Context,
addr *net.UnixAddr,
a pkg.Artifact,
flags uint64,
) (*check.Absolute, error) {
conn, err := net.DialUnix("unix", nil, addr)
if err != nil {
return nil, err
}
go func() { <-ctx.Done(); _ = conn.SetDeadline(time.Now()) }()
go func() {
<-ctx.Done()
_ = conn.SetDeadline(time.Now())
}()
if n, flagErr := conn.Write(binary.LittleEndian.AppendUint64(nil, flags)); flagErr != nil {
return nil, errors.Join(flagErr, conn.Close())
} else if n != 8 {
return nil, errors.Join(io.ErrShortWrite, conn.Close())
}
if err = pkg.NewIR().EncodeAll(conn, a); err != nil {
return nil, errors.Join(err, conn.Close())
@@ -133,6 +150,10 @@ func cureRemote(
return nil, errors.Join(err, conn.Close())
}
if flags&remoteNoReply != 0 {
return nil, conn.Close()
}
payload, recvErr := io.ReadAll(conn)
if err = errors.Join(recvErr, conn.Close()); err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {