hakurei/container/output.go
Ophestra e906cae9ee
All checks were successful
Test / Create distribution (push) Successful in 33s
Test / Sandbox (push) Successful in 2m12s
Test / Hakurei (push) Successful in 3m13s
Test / Hpkg (push) Successful in 4m1s
Test / Sandbox (race detector) (push) Successful in 4m34s
Test / Hakurei (race detector) (push) Successful in 5m14s
Test / Flake checks (push) Successful in 1m27s
container/output: export suspendable writer
This is quite useful for other packages as well. This change prepares internal/hlog for removal.

Signed-off-by: Ophestra <cat@gensokyo.uk>
2025-09-27 19:46:35 +09:00

89 lines
1.8 KiB
Go

package container
import (
"bytes"
"io"
"sync"
"sync/atomic"
"syscall"
)
const (
suspendBufInitial = 1 << 12
suspendBufMax = 1 << 24
)
// Suspendable proxies writes to a downstream [io.Writer] but optionally withholds writes
// between calls to Suspend and Resume.
type Suspendable struct {
Downstream io.Writer
s atomic.Bool
buf bytes.Buffer
// for growing buf
bufOnce sync.Once
// for synchronising all other buf operations
bufMu sync.Mutex
dropped int
}
func (s *Suspendable) Write(p []byte) (n int, err error) {
if !s.s.Load() {
return s.Downstream.Write(p)
}
s.bufOnce.Do(func() { s.buf.Grow(suspendBufInitial) })
s.bufMu.Lock()
defer s.bufMu.Unlock()
if free := suspendBufMax - s.buf.Len(); free < len(p) {
// fast path
if free <= 0 {
s.dropped += len(p)
return 0, syscall.ENOMEM
}
n, _ = s.buf.Write(p[:free])
err = syscall.ENOMEM
s.dropped += len(p) - n
return
}
return s.buf.Write(p)
}
// IsSuspended returns whether [Suspendable] is currently between a call to Suspend and Resume.
func (s *Suspendable) IsSuspended() bool { return s.s.Load() }
// Suspend causes [Suspendable] to start withholding output in its buffer.
func (s *Suspendable) Suspend() bool { return s.s.CompareAndSwap(false, true) }
// Resume undoes the effect of Suspend and dumps the buffered into the downstream [io.Writer].
func (s *Suspendable) Resume() (resumed bool, dropped uintptr, n int64, err error) {
if s.s.CompareAndSwap(true, false) {
s.bufMu.Lock()
defer s.bufMu.Unlock()
resumed = true
dropped = uintptr(s.dropped)
s.dropped = 0
n, err = io.Copy(s.Downstream, &s.buf)
s.buf.Reset()
}
return
}
var msg Msg = new(DefaultMsg)
func GetOutput() Msg { return msg }
func SetOutput(v Msg) {
if v == nil {
msg = new(DefaultMsg)
} else {
msg = v
}
}