All checks were successful
		
		
	
	Test / Create distribution (push) Successful in 35s
				
			Test / Sandbox (push) Successful in 2m22s
				
			Test / Hpkg (push) Successful in 4m2s
				
			Test / Sandbox (race detector) (push) Successful in 4m28s
				
			Test / Hakurei (race detector) (push) Successful in 5m21s
				
			Test / Hakurei (push) Successful in 2m9s
				
			Test / Flake checks (push) Successful in 1m29s
				
			This package is quite useful. This change allows it to be imported without importing container. Signed-off-by: Ophestra <cat@gensokyo.uk>
		
			
				
	
	
		
			78 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			78 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package message
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"io"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"syscall"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	suspendBufInitial = 1 << 12
 | 
						|
	suspendBufMax     = 1 << 24
 | 
						|
)
 | 
						|
 | 
						|
// Suspendable proxies writes to a downstream [io.Writer] but optionally withholds writes
 | 
						|
// between calls to Suspend and Resume.
 | 
						|
type Suspendable struct {
 | 
						|
	Downstream io.Writer
 | 
						|
 | 
						|
	s atomic.Bool
 | 
						|
 | 
						|
	buf bytes.Buffer
 | 
						|
	// for growing buf
 | 
						|
	bufOnce sync.Once
 | 
						|
	// for synchronising all other buf operations
 | 
						|
	bufMu sync.Mutex
 | 
						|
 | 
						|
	dropped int
 | 
						|
}
 | 
						|
 | 
						|
func (s *Suspendable) Write(p []byte) (n int, err error) {
 | 
						|
	if !s.s.Load() {
 | 
						|
		return s.Downstream.Write(p)
 | 
						|
	}
 | 
						|
	s.bufOnce.Do(func() { s.buf.Grow(suspendBufInitial) })
 | 
						|
 | 
						|
	s.bufMu.Lock()
 | 
						|
	defer s.bufMu.Unlock()
 | 
						|
 | 
						|
	if free := suspendBufMax - s.buf.Len(); free < len(p) {
 | 
						|
		// fast path
 | 
						|
		if free <= 0 {
 | 
						|
			s.dropped += len(p)
 | 
						|
			return 0, syscall.ENOMEM
 | 
						|
		}
 | 
						|
 | 
						|
		n, _ = s.buf.Write(p[:free])
 | 
						|
		err = syscall.ENOMEM
 | 
						|
		s.dropped += len(p) - n
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	return s.buf.Write(p)
 | 
						|
}
 | 
						|
 | 
						|
// IsSuspended returns whether [Suspendable] is currently between a call to Suspend and Resume.
 | 
						|
func (s *Suspendable) IsSuspended() bool { return s.s.Load() }
 | 
						|
 | 
						|
// Suspend causes [Suspendable] to start withholding output in its buffer.
 | 
						|
func (s *Suspendable) Suspend() bool { return s.s.CompareAndSwap(false, true) }
 | 
						|
 | 
						|
// Resume undoes the effect of Suspend and dumps the buffered into the downstream [io.Writer].
 | 
						|
func (s *Suspendable) Resume() (resumed bool, dropped uintptr, n int64, err error) {
 | 
						|
	if s.s.CompareAndSwap(true, false) {
 | 
						|
		s.bufMu.Lock()
 | 
						|
		defer s.bufMu.Unlock()
 | 
						|
 | 
						|
		resumed = true
 | 
						|
		dropped = uintptr(s.dropped)
 | 
						|
 | 
						|
		s.dropped = 0
 | 
						|
		n, err = io.Copy(s.Downstream, &s.buf)
 | 
						|
		s.buf.Reset()
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 |