internal/kobject: pass action kind for range
All checks were successful
Test / Create distribution (push) Successful in 1m5s
Test / Sandbox (push) Successful in 2m52s
Test / ShareFS (push) Successful in 3m47s
Test / Hakurei (push) Successful in 3m52s
Test / Sandbox (race detector) (push) Successful in 5m24s
Test / Hakurei (race detector) (push) Successful in 6m33s
Test / Flake checks (push) Successful in 1m22s
All checks were successful
Test / Create distribution (push) Successful in 1m5s
Test / Sandbox (push) Successful in 2m52s
Test / ShareFS (push) Successful in 3m47s
Test / Hakurei (push) Successful in 3m52s
Test / Sandbox (race detector) (push) Successful in 5m24s
Test / Hakurei (race detector) (push) Successful in 6m33s
Test / Flake checks (push) Successful in 1m22s
Useful for handling most uevents. Signed-off-by: Ophestra <cat@gensokyo.uk>
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
|||||||
"hakurei.app/internal/kobject"
|
"hakurei.app/internal/kobject"
|
||||||
"hakurei.app/internal/report"
|
"hakurei.app/internal/report"
|
||||||
"hakurei.app/internal/uevent"
|
"hakurei.app/internal/uevent"
|
||||||
|
"hakurei.app/message"
|
||||||
)
|
)
|
||||||
|
|
||||||
var r report.Reporter
|
var r report.Reporter
|
||||||
@@ -79,6 +80,8 @@ const (
|
|||||||
// optionSystem specifies devpath of the system device.
|
// optionSystem specifies devpath of the system device.
|
||||||
optionSystem = "system"
|
optionSystem = "system"
|
||||||
|
|
||||||
|
// flagVerbose increases output verbosity.
|
||||||
|
flagVerbose = "verbose"
|
||||||
// flagStrict sets [report.DStrict] on r.
|
// flagStrict sets [report.DStrict] on r.
|
||||||
flagStrict = "strict"
|
flagStrict = "strict"
|
||||||
// flagNoRecover sets [report.DNoRecover] on r.
|
// flagNoRecover sets [report.DNoRecover] on r.
|
||||||
@@ -116,6 +119,9 @@ func main() {
|
|||||||
r.SetFlags(flag)
|
r.SetFlags(flag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msg := message.New(log.Default())
|
||||||
|
msg.SwapVerbose(slices.Contains(flags, flagVerbose))
|
||||||
|
|
||||||
mustSyscall("mount devtmpfs", Mount(
|
mustSyscall("mount devtmpfs", Mount(
|
||||||
"devtmpfs",
|
"devtmpfs",
|
||||||
"/dev/",
|
"/dev/",
|
||||||
@@ -193,7 +199,7 @@ func main() {
|
|||||||
must1(rand.Read(uuid[:]))
|
must1(rand.Read(uuid[:]))
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
go consume(ctx, &r, conn, uuid, events)
|
go consume(ctx, msg, &r, conn, uuid, events)
|
||||||
s := kobject.New(uuid, func(o *kobject.Object, env map[string]string) {
|
s := kobject.New(uuid, func(o *kobject.Object, env map[string]string) {
|
||||||
log.Printf("change %s: %q", o.DevPath, env)
|
log.Printf("change %s: %q", o.DevPath, env)
|
||||||
}, func(err error) {
|
}, func(err error) {
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"hakurei.app/check"
|
"hakurei.app/check"
|
||||||
"hakurei.app/fhs"
|
"hakurei.app/fhs"
|
||||||
"hakurei.app/internal/kobject"
|
"hakurei.app/internal/kobject"
|
||||||
|
"hakurei.app/internal/uevent"
|
||||||
)
|
)
|
||||||
|
|
||||||
// mustMountSystem waits for and mounts a system device matching pattern.
|
// mustMountSystem waits for and mounts a system device matching pattern.
|
||||||
@@ -26,8 +27,9 @@ func mustMountSystem(
|
|||||||
for {
|
for {
|
||||||
var matchErr error
|
var matchErr error
|
||||||
var systemPath *check.Absolute
|
var systemPath *check.Absolute
|
||||||
s.Range(c, func(o *kobject.Object) bool {
|
s.Range(c, func(o *kobject.Object, act uevent.KobjectAction) bool {
|
||||||
if o.Subsystem != "block" ||
|
if (act != uevent.KOBJ_ADD && act != uevent.KOBJ_CHANGE) ||
|
||||||
|
o.Subsystem != "block" ||
|
||||||
o.Env["DEVTYPE"] != "disk" {
|
o.Env["DEVTYPE"] != "disk" {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,12 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"hakurei.app/fhs"
|
"hakurei.app/fhs"
|
||||||
"hakurei.app/internal/report"
|
"hakurei.app/internal/report"
|
||||||
"hakurei.app/internal/uevent"
|
"hakurei.app/internal/uevent"
|
||||||
|
"hakurei.app/message"
|
||||||
)
|
)
|
||||||
|
|
||||||
// newRejectColdboot returns a function to be called on every subsequent pending
|
// newRejectColdboot returns a function to be called on every subsequent pending
|
||||||
@@ -56,6 +56,7 @@ func newRejectColdboot() func() bool {
|
|||||||
// consume continuously consumes events from conn with retries.
|
// consume continuously consumes events from conn with retries.
|
||||||
func consume(
|
func consume(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
msg message.Msg,
|
||||||
r *report.Reporter,
|
r *report.Reporter,
|
||||||
conn *uevent.Conn,
|
conn *uevent.Conn,
|
||||||
uuid uevent.UUID,
|
uuid uevent.UUID,
|
||||||
@@ -67,7 +68,7 @@ func consume(
|
|||||||
coldboot := true
|
coldboot := true
|
||||||
retry:
|
retry:
|
||||||
if dispatchErr := conn.Consume(ctx, fhs.Sys, &uuid, events, coldboot, func(path string) {
|
if dispatchErr := conn.Consume(ctx, fhs.Sys, &uuid, events, coldboot, func(path string) {
|
||||||
log.Println("coldboot visited", path)
|
msg.Verbose("coldboot visited", path)
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
if _, ok := err.(uevent.NeedsColdboot); ok && !nextColdboot() {
|
if _, ok := err.(uevent.NeedsColdboot); ok && !nextColdboot() {
|
||||||
r.Dispatch(
|
r.Dispatch(
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ func (o *Object) update(env map[string]string, strip bool) {
|
|||||||
// A pendingIterator is a callback currently iterating through objects targeted
|
// A pendingIterator is a callback currently iterating through objects targeted
|
||||||
// by ongoing events.
|
// by ongoing events.
|
||||||
type pendingIterator struct {
|
type pendingIterator struct {
|
||||||
f func(o *Object) bool
|
f func(o *Object, act uevent.KobjectAction) bool
|
||||||
done chan<- struct{}
|
done chan<- struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,12 +150,12 @@ func (s *State) deleteIter(p *pendingIterator) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dispatchIter broadcasts an [Object] to all alive iterators.
|
// dispatchIter broadcasts an [Object] to all alive iterators.
|
||||||
func (s *State) dispatchIter(o *Object) {
|
func (s *State) dispatchIter(o *Object, act uevent.KobjectAction) {
|
||||||
s.iterMu.Lock()
|
s.iterMu.Lock()
|
||||||
defer s.iterMu.Unlock()
|
defer s.iterMu.Unlock()
|
||||||
|
|
||||||
for _, p := range s.iter {
|
for _, p := range s.iter {
|
||||||
if !p.f(o) {
|
if !p.f(o, act) {
|
||||||
s.deleteIter(p)
|
s.deleteIter(p)
|
||||||
close(p.done)
|
close(p.done)
|
||||||
}
|
}
|
||||||
@@ -165,14 +165,17 @@ func (s *State) dispatchIter(o *Object) {
|
|||||||
// Range calls f on all current and upcoming [Object] values tracked by s until
|
// Range calls f on all current and upcoming [Object] values tracked by s until
|
||||||
// f returns false or the context is cancelled. f must not retain o or modify
|
// f returns false or the context is cancelled. f must not retain o or modify
|
||||||
// the value it points to.
|
// the value it points to.
|
||||||
func (s *State) Range(ctx context.Context, f func(o *Object) bool) {
|
func (s *State) Range(
|
||||||
|
ctx context.Context,
|
||||||
|
f func(o *Object, act uevent.KobjectAction) bool,
|
||||||
|
) {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
p := pendingIterator{f, done}
|
p := pendingIterator{f, done}
|
||||||
|
|
||||||
s.iterMu.Lock()
|
s.iterMu.Lock()
|
||||||
s.ueventMu.RLock()
|
s.ueventMu.RLock()
|
||||||
for _, o := range s.uevent {
|
for _, o := range s.uevent {
|
||||||
if !f(o) {
|
if !f(o, uevent.KOBJ_ADD) {
|
||||||
s.ueventMu.RUnlock()
|
s.ueventMu.RUnlock()
|
||||||
s.iterMu.Unlock()
|
s.iterMu.Unlock()
|
||||||
return
|
return
|
||||||
@@ -296,13 +299,13 @@ func (s *State) processEvent(e *Event) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch e.Action {
|
switch act := e.Action; act {
|
||||||
case uevent.KOBJ_ADD:
|
case uevent.KOBJ_ADD:
|
||||||
if e.Synth == nil {
|
if e.Synth == nil {
|
||||||
if o, ok := s.uevent[e.DevPath]; ok {
|
if o, ok := s.uevent[e.DevPath]; ok {
|
||||||
s.reportErr(e.NewError(EDuplicateAdd, o))
|
s.reportErr(e.NewError(EDuplicateAdd, o))
|
||||||
o.merge(e.Env)
|
o.merge(e.Env)
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -312,7 +315,7 @@ func (s *State) processEvent(e *Event) {
|
|||||||
}
|
}
|
||||||
o.merge(e.Env)
|
o.merge(e.Env)
|
||||||
s.uevent[e.DevPath] = o
|
s.uevent[e.DevPath] = o
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
case uevent.KOBJ_REMOVE:
|
case uevent.KOBJ_REMOVE:
|
||||||
@@ -338,14 +341,14 @@ func (s *State) processEvent(e *Event) {
|
|||||||
o = e.makeColdboot()
|
o = e.makeColdboot()
|
||||||
o.merge(e.Env)
|
o.merge(e.Env)
|
||||||
s.uevent[e.DevPath] = o
|
s.uevent[e.DevPath] = o
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
o.update(e.Env, true)
|
o.update(e.Env, true)
|
||||||
if s.handleChange != nil {
|
if s.handleChange != nil {
|
||||||
s.handleChange(o, e.Env)
|
s.handleChange(o, e.Env)
|
||||||
}
|
}
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
case uevent.KOBJ_MOVE:
|
case uevent.KOBJ_MOVE:
|
||||||
@@ -368,7 +371,7 @@ func (s *State) processEvent(e *Event) {
|
|||||||
o.merge(e.Env)
|
o.merge(e.Env)
|
||||||
s.uevent[e.DevPath] = o
|
s.uevent[e.DevPath] = o
|
||||||
o.DevPath = e.DevPath
|
o.DevPath = e.DevPath
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
case uevent.KOBJ_ONLINE:
|
case uevent.KOBJ_ONLINE:
|
||||||
@@ -384,7 +387,7 @@ func (s *State) processEvent(e *Event) {
|
|||||||
s.reportErr(e.NewError(EUnexpectedOffline, o))
|
s.reportErr(e.NewError(EUnexpectedOffline, o))
|
||||||
}
|
}
|
||||||
o.Offline = false
|
o.Offline = false
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
case uevent.KOBJ_OFFLINE:
|
case uevent.KOBJ_OFFLINE:
|
||||||
@@ -400,7 +403,7 @@ func (s *State) processEvent(e *Event) {
|
|||||||
s.reportErr(e.NewError(EUnexpectedOffline, o))
|
s.reportErr(e.NewError(EUnexpectedOffline, o))
|
||||||
}
|
}
|
||||||
o.Offline = true
|
o.Offline = true
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
case uevent.KOBJ_BIND:
|
case uevent.KOBJ_BIND:
|
||||||
@@ -416,7 +419,7 @@ func (s *State) processEvent(e *Event) {
|
|||||||
}
|
}
|
||||||
o.State = StateBound
|
o.State = StateBound
|
||||||
o.merge(e.Env)
|
o.merge(e.Env)
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
case uevent.KOBJ_UNBIND:
|
case uevent.KOBJ_UNBIND:
|
||||||
@@ -432,7 +435,7 @@ func (s *State) processEvent(e *Event) {
|
|||||||
}
|
}
|
||||||
o.State = StateNew
|
o.State = StateNew
|
||||||
o.Driver = ""
|
o.Driver = ""
|
||||||
s.dispatchIter(o)
|
s.dispatchIter(o, act)
|
||||||
return
|
return
|
||||||
|
|
||||||
default: // not reached
|
default: // not reached
|
||||||
|
|||||||
@@ -388,7 +388,9 @@ func TestIter(t *testing.T) {
|
|||||||
"SEQNUM=1",
|
"SEQNUM=1",
|
||||||
}}
|
}}
|
||||||
synctest.Wait()
|
synctest.Wait()
|
||||||
s.Range(t.Context(), func(o *Object) bool { return false })
|
s.Range(t.Context(), func(*Object, uevent.KobjectAction) bool {
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
var got []*Object
|
var got []*Object
|
||||||
check := func(want []*Object) {
|
check := func(want []*Object) {
|
||||||
@@ -405,7 +407,7 @@ func TestIter(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
var done bool
|
var done bool
|
||||||
wg.Go(func() {
|
wg.Go(func() {
|
||||||
s.Range(ctx, func(o *Object) bool {
|
s.Range(ctx, func(o *Object, _ uevent.KobjectAction) bool {
|
||||||
got = append(got, o.Clone())
|
got = append(got, o.Clone())
|
||||||
return !done
|
return !done
|
||||||
})
|
})
|
||||||
@@ -437,7 +439,11 @@ func TestIter(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
wg.Go(func() { s.Range(ctx, func(*Object) bool { return true }) })
|
wg.Go(func() {
|
||||||
|
s.Range(ctx, func(*Object, uevent.KobjectAction) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
})
|
||||||
synctest.Wait()
|
synctest.Wait()
|
||||||
|
|
||||||
iter := reflect.ValueOf(s).Elem().FieldByName("iter")
|
iter := reflect.ValueOf(s).Elem().FieldByName("iter")
|
||||||
|
|||||||
Reference in New Issue
Block a user