diff --git a/container/output.go b/container/output.go index 50da335..86f57c8 100644 --- a/container/output.go +++ b/container/output.go @@ -1,5 +1,81 @@ package container +import ( + "bytes" + "io" + "sync" + "sync/atomic" + "syscall" +) + +const ( + suspendBufInitial = 1 << 12 + suspendBufMax = 1 << 24 +) + +// Suspendable proxies writes to a downstream [io.Writer] but optionally withholds writes +// between calls to Suspend and Resume. +type Suspendable struct { + Downstream io.Writer + + s atomic.Bool + + buf bytes.Buffer + // for growing buf + bufOnce sync.Once + // for synchronising all other buf operations + bufMu sync.Mutex + + dropped int +} + +func (s *Suspendable) Write(p []byte) (n int, err error) { + if !s.s.Load() { + return s.Downstream.Write(p) + } + s.bufOnce.Do(func() { s.buf.Grow(suspendBufInitial) }) + + s.bufMu.Lock() + defer s.bufMu.Unlock() + + if free := suspendBufMax - s.buf.Len(); free < len(p) { + // fast path + if free <= 0 { + s.dropped += len(p) + return 0, syscall.ENOMEM + } + + n, _ = s.buf.Write(p[:free]) + err = syscall.ENOMEM + s.dropped += len(p) - n + return + } + + return s.buf.Write(p) +} + +// IsSuspended returns whether [Suspendable] is currently between a call to Suspend and Resume. +func (s *Suspendable) IsSuspended() bool { return s.s.Load() } + +// Suspend causes [Suspendable] to start withholding output in its buffer. +func (s *Suspendable) Suspend() bool { return s.s.CompareAndSwap(false, true) } + +// Resume undoes the effect of Suspend and dumps the buffered into the downstream [io.Writer]. +func (s *Suspendable) Resume() (resumed bool, dropped uintptr, n int64, err error) { + if s.s.CompareAndSwap(true, false) { + s.bufMu.Lock() + defer s.bufMu.Unlock() + + resumed = true + dropped = uintptr(s.dropped) + + s.dropped = 0 + n, err = io.Copy(s.Downstream, &s.buf) + s.buf.Reset() + } + return +} + var msg Msg = new(DefaultMsg) func GetOutput() Msg { return msg } diff --git a/container/output_test.go b/container/output_test.go index 85ef756..54eba8f 100644 --- a/container/output_test.go +++ b/container/output_test.go @@ -1,9 +1,155 @@ package container import ( + "bytes" + "errors" + "reflect" + "strconv" + "syscall" "testing" + + "hakurei.app/container/stub" ) +func TestSuspendable(t *testing.T) { + const ( + // equivalent to len(want.pt) + nSpecialPtEquiv = -iota - 1 + // equivalent to len(want.w) + nSpecialWEquiv + // suspends writer before executing test case, implies nSpecialWEquiv + nSpecialSuspend + // offset: resume writer and measure against dump instead, implies nSpecialPtEquiv + nSpecialDump + ) + + // shares the same writer + testCases := []struct { + name string + w, pt []byte + err error + wantErr error + n int + }{ + {"simple", []byte{0xde, 0xad, 0xbe, 0xef}, []byte{0xde, 0xad, 0xbe, 0xef}, + nil, nil, nSpecialPtEquiv}, + + {"error", []byte{0xb, 0xad}, []byte{0xb, 0xad}, + stub.UniqueError(0), stub.UniqueError(0), nSpecialPtEquiv}, + + {"suspend short", []byte{0}, nil, + nil, nil, nSpecialSuspend}, + {"sw short 0", []byte{0xca, 0xfe, 0xba, 0xbe}, nil, + nil, nil, nSpecialWEquiv}, + {"sw short 1", []byte{0xff}, nil, + nil, nil, nSpecialWEquiv}, + {"resume short", nil, []byte{0, 0xca, 0xfe, 0xba, 0xbe, 0xff}, nil, nil, + nSpecialDump}, + + {"long pt", bytes.Repeat([]byte{0xff}, suspendBufMax+1), bytes.Repeat([]byte{0xff}, suspendBufMax+1), + nil, nil, nSpecialPtEquiv}, + + {"suspend fill", bytes.Repeat([]byte{0xfe}, suspendBufMax), nil, + nil, nil, nSpecialSuspend}, + {"drop", []byte{0}, nil, + nil, syscall.ENOMEM, 0}, + {"drop error", []byte{0}, nil, + stub.UniqueError(1), syscall.ENOMEM, 0}, + {"resume fill", nil, bytes.Repeat([]byte{0xfe}, suspendBufMax), + nil, nil, nSpecialDump - 2}, + + {"suspend fill partial", bytes.Repeat([]byte{0xfd}, suspendBufMax-0xf), nil, + nil, nil, nSpecialSuspend}, + {"partial write", bytes.Repeat([]byte{0xad}, 0x1f), nil, + nil, syscall.ENOMEM, 0xf}, + {"full drop", []byte{0}, nil, + nil, syscall.ENOMEM, 0}, + {"resume fill partial", nil, append(bytes.Repeat([]byte{0xfd}, suspendBufMax-0xf), bytes.Repeat([]byte{0xad}, 0xf)...), + nil, nil, nSpecialDump - 0x10 - 1}, + } + + var dw expectWriter + + w := Suspendable{Downstream: &dw} + for _, tc := range testCases { + // these share the same writer, so cannot be subtests + t.Logf("writing step %q", tc.name) + dw.expect, dw.err = tc.pt, tc.err + + var ( + gotN int + gotErr error + ) + + wantN := tc.n + switch wantN { + case nSpecialPtEquiv: + wantN = len(tc.pt) + gotN, gotErr = w.Write(tc.w) + + case nSpecialWEquiv: + wantN = len(tc.w) + gotN, gotErr = w.Write(tc.w) + + case nSpecialSuspend: + s := w.IsSuspended() + if ok := w.Suspend(); s && ok { + t.Fatal("Suspend: unexpected success") + } + + wantN = len(tc.w) + gotN, gotErr = w.Write(tc.w) + + default: + if wantN <= nSpecialDump { + if !w.IsSuspended() { + t.Fatal("IsSuspended unexpected false") + } + + resumed, dropped, n, err := w.Resume() + if !resumed { + t.Fatal("Resume: resumed = false") + } + if wantDropped := nSpecialDump - wantN; int(dropped) != wantDropped { + t.Errorf("Resume: dropped = %d, want %d", dropped, wantDropped) + } + + wantN = len(tc.pt) + gotN, gotErr = int(n), err + } else { + gotN, gotErr = w.Write(tc.w) + } + } + + if gotN != wantN { + t.Errorf("Write: n = %d, want %d", gotN, wantN) + } + + if !reflect.DeepEqual(gotErr, tc.wantErr) { + t.Errorf("Write: %v", gotErr) + } + } +} + +// expectWriter compares Write calls to expect. +type expectWriter struct { + expect []byte + err error +} + +func (w *expectWriter) Write(p []byte) (n int, err error) { + defer func() { w.expect = nil }() + + n, err = len(p), w.err + if w.expect == nil { + return 0, errors.New("unexpected call to Write: " + strconv.Quote(string(p))) + } + if string(p) != string(w.expect) { + return 0, errors.New("p = " + strconv.Quote(string(p)) + ", want " + strconv.Quote(string(w.expect))) + } + return +} + func TestGetSetOutput(t *testing.T) { { out := GetOutput() diff --git a/internal/hlog/hlog.go b/internal/hlog/hlog.go index 415d16b..ff9b201 100644 --- a/internal/hlog/hlog.go +++ b/internal/hlog/hlog.go @@ -2,69 +2,17 @@ package hlog import ( - "bytes" - "io" "log" "os" - "sync" - "sync/atomic" - "syscall" + + "hakurei.app/container" ) -const ( - bufSize = 4 * 1024 - bufSizeMax = 16 * 1024 * 1024 -) - -var o = &suspendable{w: os.Stderr} +var o = &container.Suspendable{Downstream: os.Stderr} // Prepare configures the system logger for [Suspend] and [Resume] to take effect. func Prepare(prefix string) { log.SetPrefix(prefix + ": "); log.SetFlags(0); log.SetOutput(o) } -type suspendable struct { - w io.Writer - s atomic.Bool - - buf bytes.Buffer - bufOnce sync.Once - bufMu sync.Mutex - dropped int -} - -func (s *suspendable) Write(p []byte) (n int, err error) { - if !s.s.Load() { - return s.w.Write(p) - } - s.bufOnce.Do(func() { s.prepareBuf() }) - - s.bufMu.Lock() - defer s.bufMu.Unlock() - - if l := len(p); s.buf.Len()+l > bufSizeMax { - s.dropped += l - return 0, syscall.ENOMEM - } - return s.buf.Write(p) -} - -func (s *suspendable) prepareBuf() { s.buf.Grow(bufSize) } -func (s *suspendable) Suspend() bool { return o.s.CompareAndSwap(false, true) } -func (s *suspendable) Resume() (resumed bool, dropped uintptr, n int64, err error) { - if o.s.CompareAndSwap(true, false) { - o.bufMu.Lock() - defer o.bufMu.Unlock() - - resumed = true - dropped = uintptr(o.dropped) - - o.dropped = 0 - n, err = io.Copy(s.w, &s.buf) - s.buf = bytes.Buffer{} - s.prepareBuf() - } - return -} - func Suspend() bool { return o.Suspend() } func Resume() bool { resumed, dropped, _, err := o.Resume()