fortify/internal/fmsg/defer.go
Ophestra Umiker 866270ff05
All checks were successful
test / test (push) Successful in 27s
fmsg: add to wg prior to enqueue
Adding after channel write is racy.

Signed-off-by: Ophestra Umiker <cat@ophivana.moe>
2024-11-17 23:50:02 +09:00

98 lines
1.5 KiB
Go

package fmsg
import (
"os"
"sync"
"sync/atomic"
)
var (
wstate atomic.Bool
dropped atomic.Uint64
withhold = make(chan struct{}, 1)
msgbuf = make(chan dOp, 64) // these ops are tiny so a large buffer is allocated for withholding output
dequeueOnce sync.Once
queueSync sync.WaitGroup
)
func dequeue() {
go func() {
for {
select {
case op := <-msgbuf:
op.Do()
queueSync.Done()
case <-withhold:
<-withhold
}
}
}()
}
// queue submits ops to msgbuf but drops messages
// when the buffer is full and dequeue is withholding
func queue(op dOp) {
queueSync.Add(1)
select {
case msgbuf <- op:
default:
// send the op anyway if not withholding
// as dequeue will get to it eventually
if !wstate.Load() {
msgbuf <- op
} else {
queueSync.Done()
// increment dropped message count
dropped.Add(1)
}
}
}
type dOp interface{ Do() }
func Exit(code int) {
queueSync.Wait()
os.Exit(code)
}
func Suspend() {
dequeueOnce.Do(dequeue)
if wstate.CompareAndSwap(false, true) {
queueSync.Wait()
withhold <- struct{}{}
}
}
func Resume() {
dequeueOnce.Do(dequeue)
if wstate.CompareAndSwap(true, false) {
withhold <- struct{}{}
if d := dropped.Swap(0); d != 0 {
Printf("dropped %d messages during withhold", d)
}
}
}
type dPrint []any
func (v dPrint) Do() {
std.Print(v...)
}
type dPrintf struct {
format string
v []any
}
func (d *dPrintf) Do() {
std.Printf(d.format, d.v...)
}
type dPrintln []any
func (v dPrintln) Do() {
std.Println(v...)
}