diff --git a/README.md b/README.md index fe69dc5..7eba448 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,8 @@ $ haggar -h Usage of haggar: -carbon="localhost:2003": address of carbon host -flush=10000: how often to flush metrics, in millis - -gen=10000: how often to gen new metrics, in millis + -gen=10000: how often to gen new agents, in millis -gen-size=10000: number of metrics to generate per batch + -jitter=10000: max amount of jitter to introduce in between agent launches -prefix="bench": prefix for metrics ``` diff --git a/main.go b/main.go index 6ff4695..347ff8a 100644 --- a/main.go +++ b/main.go @@ -12,45 +12,62 @@ import ( // config vars, to be manipulated via command line flags var ( - carbon string - prefix string - flush int - gen int - genSize int + carbon string + prefix string + flush int + gen int + agentSize int + jitter int ) -// generate N metrics, prefixed with batchID for tracking -func genMetrics(prefix string, batchID, n int) []string { - metrics := make([]string, n) - for i := 0; i < n; i++ { - metrics[i] = fmt.Sprintf("%s.%d.%d", prefix, batchID, i) - } +type Agent struct { + ID int + FlushInterval time.Duration + Addr string + MetricNames []string +} - return metrics +func (a *Agent) Loop() { + for { + select { + case <-time.NewTicker(a.FlushInterval).C: + err := a.Flush() + if err != nil { + log.Printf("agent %d: %s\n", a.ID, err) + } + } + } } -// flush a collection of metrics to a carbon server located at addr -// epoch is set to now -// the value is randomly generated -func flushMetrics(addr string, metrics []string) error { - conn, err := net.Dial("tcp", addr) +func (a *Agent) Flush() error { + conn, err := net.Dial("tcp", a.Addr) if err != nil { return err } defer conn.Close() epoch := time.Now().Unix() - for _, m := range metrics { - err := carbonate(conn, m, rand.Intn(1000), epoch) + for _, name := range a.MetricNames { + err := carbonate(conn, name, rand.Intn(1000), epoch) if err != nil { return err } } - log.Printf("flushed %d metrics\n", len(metrics)) + log.Printf("agent %d: flushed %d metrics\n", a.ID, len(a.MetricNames)) return nil } +// generate N metric names, prefixed with batchID for tracking +func genMetricNames(prefix string, id, n int) []string { + names := make([]string, n) + for i := 0; i < n; i++ { + names[i] = fmt.Sprintf("%s.agent.%d.metrics.%d", prefix, id, i) + } + + return names +} + // actually write the data in carbon line format func carbonate(w io.ReadWriteCloser, name string, value int, epoch int64) error { _, err := fmt.Fprintf(w, "%s %d %d\n", name, value, epoch) @@ -60,44 +77,40 @@ func carbonate(w io.ReadWriteCloser, name string, value int, epoch int64) error return nil } +func launchAgent(id, n int, flush time.Duration, addr, prefix string) { + metricNames := genMetricNames(prefix, id, n) + + a := &Agent{ID: id, FlushInterval: time.Duration(flush), Addr: addr, MetricNames: metricNames} + a.Loop() +} + func init() { flag.StringVar(&carbon, "carbon", "localhost:2003", "address of carbon host") flag.StringVar(&prefix, "prefix", "bench", "prefix for metrics") flag.IntVar(&flush, "flush", 10000, "how often to flush metrics, in millis") - flag.IntVar(&gen, "gen", 10000, "how often to gen new metrics, in millis") - flag.IntVar(&genSize, "gen-size", 10000, "number of metrics to generate per batch") + flag.IntVar(&gen, "gen", 10000, "how often to gen new agents, in millis") + flag.IntVar(&agentSize, "gen-size", 10000, "number of metrics to generate per batch") + flag.IntVar(&jitter, "jitter", 10000, "max amount of jitter to introduce in between agent launches") } func main() { flag.Parse() - genTicker := time.NewTicker(time.Duration(gen) * time.Millisecond).C - flushTicker := time.NewTicker(time.Duration(flush) * time.Millisecond).C - - metrics := make([]string, 0) - batchID := 0 + curID := 0 - // initial generation - metrics = append(metrics, genMetrics(prefix, batchID, genSize)...) - batchID++ + go launchAgent(curID, agentSize, time.Duration(flush)*time.Millisecond, carbon, prefix) + log.Printf("agent %d: launched\n", curID) + curID++ - // initial flush - err := flushMetrics(carbon, metrics) - if err != nil { - log.Fatal(err) - } - - // inner loop for { select { - case <-flushTicker: - err := flushMetrics(carbon, metrics) - if err != nil { - log.Fatal(err) - } - case <-genTicker: - metrics = append(metrics, genMetrics(prefix, batchID, genSize)...) - batchID++ + case <-time.NewTicker(time.Duration(gen) * time.Millisecond).C: + // sleep for some jitter + time.Sleep(time.Duration(rand.Intn(jitter)) * time.Millisecond) + + go launchAgent(curID, agentSize, time.Duration(flush)*time.Millisecond, carbon, prefix) + log.Printf("agent %d: launched\n", curID) + curID++ } } }