Skip to content

Commit

Permalink
[wip]
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Hadlaw <[email protected]>
  • Loading branch information
tommyp1ckles committed May 20, 2024
1 parent 9b300d7 commit a093edb
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 24 deletions.
141 changes: 133 additions & 8 deletions cell/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cell
import (
"fmt"
"log/slog"
"reflect"
"sort"
"strings"
"sync"
Expand All @@ -20,19 +21,26 @@ type invoker struct {
// probe denotes if this invoke is a "cell.Probe" which is run before
// the lifecycle phase, but not during
probe bool
funcs []namedFunc
//funcs []namedFunc[dig.InvokeInfo]
funcs []invokerNamedFunc
}

type namedFunc struct {
type namedFunc[InfoType any] struct {
name string
fn any

infoMu sync.Mutex
info *dig.InvokeInfo
info *InfoType
}

type invokerNamedFunc = namedFunc[dig.InvokeInfo]

type InvokerList interface {
AppendInvoke(probe bool, fn func(*slog.Logger, time.Duration) error)
AppendInvoke(fn func(*slog.Logger, time.Duration) error)
}

type ProberList interface {
AppendProbe(fn func(*slog.Logger, time.Duration) error)
}

func (inv *invoker) invoke(log *slog.Logger, cont container, logThreshold time.Duration) error {
Expand Down Expand Up @@ -67,6 +75,10 @@ func (inv *invoker) invoke(log *slog.Logger, cont container, logThreshold time.D

func (inv *invoker) Apply(c container) error {
// Remember the scope in which we need to invoke.
// This func is just a wrapper for some logging stuff, by doing inv.invoke we're really running
// container.Invoke(inv.fn, ...)
// This means that when this function is called, we give the hive a Invoke to do with our fn.
// For probe, we'll want to do something similar, but we'll instead do a Provide.
invoker := func(log *slog.Logger, logThreshold time.Duration) error { return inv.invoke(log, c, logThreshold) }

// Append the invoker to the list of invoke functions. These are invoked
Expand All @@ -76,7 +88,7 @@ func (inv *invoker) Apply(c container) error {
// we don't yet know which command to run, but we still need to register
// all the flags.
return c.Invoke(func(l InvokerList) {
l.AppendInvoke(inv.probe, invoker)
l.AppendInvoke(invoker)
})
}

Expand Down Expand Up @@ -108,15 +120,128 @@ func Invoke(funcs ...any) Cell {
}

func invoke(probe bool, funcs ...any) Cell {
namedFuncs := []namedFunc{}
namedFuncs := []invokerNamedFunc{}
for _, fn := range funcs {
namedFuncs = append(
namedFuncs,
namedFunc{name: internal.FuncNameAndLocation(fn), fn: fn})
invokerNamedFunc{name: internal.FuncNameAndLocation(fn), fn: fn})
}
return &invoker{funcs: namedFuncs, probe: probe}
}

func Probe(funcs ...any) Cell {
return invoke(true, funcs...)
return invokeProbe(funcs...)
}

type provideNamedFunc namedFunc[dig.ProvideInfo]

type probe struct {
funcs []provideNamedFunc
}

func invokeProbe(funcs ...any) Cell {
namedFuncs := []provideNamedFunc{}
for _, fn := range funcs {
namedFuncs = append(
namedFuncs,
provideNamedFunc{name: internal.FuncNameAndLocation(fn), fn: fn})
}
return &probe{funcs: namedFuncs}
}

// Apply the prober to the list of probers
func (p *probe) Apply(c container) error {
// Remember the scope in which we need to invoke.
prober := func(log *slog.Logger, logThreshold time.Duration) error { return p.apply(log, c, logThreshold) }

// Append the invoker to the list of invoke functions. These are invoked
// prior to start to build up the objects. They are not invoked directly
// here as first the configuration flags need to be registered. This allows
// using hives in a command-line application with many commands and where
// we don't yet know which command to run, but we still need to register
// all the flags.

// Appends to a list of probes, these are provided after config has been created
// but before invoke, when running hive.
// TODO: Need to have some way to only run this on probe.
// By invoking this, we're adding this to a list of things to invoke later (in the hive).
return c.Invoke(func(l ProberList) {
l.AppendProbe(prober)
})
}

func (p *probe) apply(log *slog.Logger, cont container, logThreshold time.Duration) error {
for i := range p.funcs {
nf := &p.funcs[i]
log.Debug("Providing Config Probe", "function", nf.name)

v := reflect.ValueOf(nf.fn)
// Check that the config override is of type func(*cfg) and
// 'cfg' implements Flagger.
t := v.Type()
if t.Kind() != reflect.Func || t.NumIn() != 1 {
return fmt.Errorf("config override has invalid type %T, expected func(*T)", nf.fn)
}
flaggerType := reflect.TypeOf((*Flagger)(nil)).Elem()
if !t.In(0).Implements(flaggerType) {
return fmt.Errorf("config override function parameter (%T) does not implement Flagger", nf.fn)
}

// Construct the provider function: 'func() func(*cfg)'. This is
// picked up by the config cell and called to mutate the config
// after it has been parsed.
providerFunc := func(in []reflect.Value) []reflect.Value {
return []reflect.Value{v}
}
providerFuncType := reflect.FuncOf(nil, []reflect.Type{t}, false)
pfv := reflect.MakeFunc(providerFuncType, providerFunc)
if err := cont.Provide(pfv.Interface()); err != nil {
return fmt.Errorf("providing config override failed: %w", err)
}

t0 := time.Now()

var opts []dig.ProvideOption
nf.infoMu.Lock()
if nf.info == nil {
nf.info = &dig.ProvideInfo{}
opts = []dig.ProvideOption{
dig.FillProvideInfo(nf.info)}
}
defer p.funcs[i].infoMu.Unlock()

if err := cont.Provide(nf.fn, opts...); err != nil {
log.Error("Failed to provide config probe", "error", err, "function", nf.name)
return err
}
d := time.Since(t0)
if d > logThreshold {
log.Info("Probed", "duration", d, "function", nf.name)
} else {
log.Debug("Probed", "duration", d, "function", nf.name)
}

}
return nil
}

func (p *probe) Info(container) Info {
n := NewInfoNode("")
for i := range p.funcs {
namedFunc := &p.funcs[i]
namedFunc.infoMu.Lock()
defer namedFunc.infoMu.Unlock()

invNode := NewInfoNode(fmt.Sprintf("🛰️ %s", namedFunc.name))
invNode.condensed = true

var ins []string
for _, input := range namedFunc.info.Inputs {
ins = append(ins, input.String())
}
sort.Strings(ins)
invNode.AddLeaf("⇨ %s", strings.Join(ins, ", "))
n.Add(invNode)
}
return n
}
14 changes: 9 additions & 5 deletions hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ type defaults struct {
Lifecycle cell.Lifecycle
Shutdowner Shutdowner
InvokerList cell.InvokerList
ProberList cell.ProberList
EmptyFullModuleID cell.FullModuleID
DecodeHooks cell.DecodeHooks
ModuleDecorators cell.ModuleDecorators
Expand All @@ -204,6 +205,7 @@ func (h *Hive) provideDefaults() error {
Lifecycle: h.lifecycle,
Shutdowner: h,
InvokerList: h,
ProberList: h,
EmptyFullModuleID: nil,
DecodeHooks: h.opts.DecodeHooks,
ModuleDecorators: h.opts.ModuleDecorators,
Expand Down Expand Up @@ -316,6 +318,8 @@ func (h *Hive) populate(log *slog.Logger, probe bool) error {
}
}

// Probes apply provide like funcs on Config[T] types
// TODO: Can we replace the ConfigOverrides with this, it seems kinda similar?
if probe {
for _, probe := range h.probes {
if err := probe(log, h.opts.LogThreshold); err != nil {
Expand All @@ -333,14 +337,14 @@ func (h *Hive) populate(log *slog.Logger, probe bool) error {
return nil
}

func (h *Hive) AppendInvoke(probe bool, invoke func(*slog.Logger, time.Duration) error) {
if probe {
h.probes = append(h.probes, invoke)
return
}
func (h *Hive) AppendInvoke(invoke func(*slog.Logger, time.Duration) error) {
h.invokes = append(h.invokes, invoke)
}

func (h *Hive) AppendProbe(invoke func(*slog.Logger, time.Duration) error) {
h.probes = append(h.probes, invoke)
}

// Start starts the hive. The context allows cancelling the start.
// If context is cancelled and the start hooks do not respect the cancellation
// then after 5 more seconds the process will be terminated forcefully.
Expand Down
25 changes: 14 additions & 11 deletions hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,24 +530,27 @@ func Test_Regression_Parallel_Config(t *testing.T) {
}

func TestHiveWithProbe(t *testing.T) {
orderLock := &sync.Mutex{}
order := []string{}
//orderLock := &sync.Mutex{}
/*order := []string{}
event := func(e string) {
orderLock.Lock()
defer orderLock.Unlock()
order = append(order, e)
}
}*/

probeFn := func(c Config) {
event("probe1:" + c.Foo)
}
testCell := cell.Module(
"test",
"Test Module",
cell.Config(Config{}),
cell.Probe(probeFn, probeFn),
cell.Invoke(func(c Config) {
event("invoke1")
cell.Probe(func(c Config) *Config {
return &Config{Foo: "zap"}
}),
cell.Invoke(func(c *Config) {
fmt.Println("=>", c)
fmt.Println("=>", c)
fmt.Println("=>", c)
fmt.Println("=>", c)
fmt.Println("=>", c)
}),
)

Expand All @@ -563,10 +566,10 @@ func TestHiveWithProbe(t *testing.T) {
err = h.Stop(log, context.TODO())
assert.NoError(t, err, "expected Stop to succeed")

assert.Equal(t, []string{"probe1:hello world", "probe1:hello world", "invoke1"}, order)
/*assert.Equal(t, []string{"probe1:hello world", "probe1:hello world", "invoke1"}, order)
order = nil
h = hive.New(testCell)
assert.NoError(t, h.Populate(log))
assert.Equal(t, []string{"invoke1"}, order)
assert.Equal(t, []string{"invoke1"}, order)*/
}

0 comments on commit a093edb

Please sign in to comment.