Skip to content

Commit

Permalink
Agentify
Browse files Browse the repository at this point in the history
ref #3

* introduces the idea of an agent to model something like collectd
* agents have a configurable metric size and flush rate
* agents are spawned every N millis, with some jitter
  • Loading branch information
Michael Gorsuch committed Aug 6, 2014
1 parent 251847d commit 9f012f3
Showing 1 changed file with 58 additions and 45 deletions.
103 changes: 58 additions & 45 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,62 @@ import (

// config vars, to be manipulated via command line flags
var (
carbon string
prefix string
flush int
gen int
genSize int
carbon string
prefix string
flush int
gen int
agentSize int
jitter int
)

// generate N metrics, prefixed with batchID for tracking
func genMetrics(prefix string, batchID, n int) []string {
metrics := make([]string, n)
for i := 0; i < n; i++ {
metrics[i] = fmt.Sprintf("%s.%d.%d", prefix, batchID, i)
}
type Agent struct {
ID int
FlushInterval time.Duration
Addr string
MetricNames []string
}

return metrics
func (a *Agent) Loop() {
for {
select {
case <-time.NewTicker(a.FlushInterval).C:
err := a.Flush()
if err != nil {
log.Printf("agent %d: %s\n", a.ID, err)
}
}
}
}

// flush a collection of metrics to a carbon server located at addr
// epoch is set to now
// the value is randomly generated
func flushMetrics(addr string, metrics []string) error {
conn, err := net.Dial("tcp", addr)
func (a *Agent) Flush() error {
conn, err := net.Dial("tcp", a.Addr)
if err != nil {
return err
}
defer conn.Close()

epoch := time.Now().Unix()
for _, m := range metrics {
err := carbonate(conn, m, rand.Intn(1000), epoch)
for _, name := range a.MetricNames {
err := carbonate(conn, name, rand.Intn(1000), epoch)
if err != nil {
return err
}
}

log.Printf("flushed %d metrics\n", len(metrics))
log.Printf("agent %d: flushed %d metrics\n", a.ID, len(a.MetricNames))
return nil
}

// generate N metric names, prefixed with batchID for tracking
func genMetricNames(prefix string, id, n int) []string {
names := make([]string, n)
for i := 0; i < n; i++ {
names[i] = fmt.Sprintf("%s.agent.%d.metrics.%d", prefix, id, i)
}

return names
}

// actually write the data in carbon line format
func carbonate(w io.ReadWriteCloser, name string, value int, epoch int64) error {
_, err := fmt.Fprintf(w, "%s %d %d\n", name, value, epoch)
Expand All @@ -60,44 +77,40 @@ func carbonate(w io.ReadWriteCloser, name string, value int, epoch int64) error
return nil
}

func launchAgent(id, n int, flush time.Duration, addr, prefix string) {
metricNames := genMetricNames(prefix, id, n)

a := &Agent{ID: id, FlushInterval: time.Duration(flush), Addr: addr, MetricNames: metricNames}
a.Loop()
}

func init() {
flag.StringVar(&carbon, "carbon", "localhost:2003", "address of carbon host")
flag.StringVar(&prefix, "prefix", "bench", "prefix for metrics")
flag.IntVar(&flush, "flush", 10000, "how often to flush metrics, in millis")
flag.IntVar(&gen, "gen", 10000, "how often to gen new metrics, in millis")
flag.IntVar(&genSize, "gen-size", 10000, "number of metrics to generate per batch")
flag.IntVar(&gen, "gen", 10000, "how often to gen new agents, in millis")
flag.IntVar(&agentSize, "gen-size", 10000, "number of metrics to generate per batch")
flag.IntVar(&jitter, "jitter", 10000, "max amount of jitter to introduce in between agent launches")
}

func main() {
flag.Parse()

genTicker := time.NewTicker(time.Duration(gen) * time.Millisecond).C
flushTicker := time.NewTicker(time.Duration(flush) * time.Millisecond).C

metrics := make([]string, 0)
batchID := 0
curID := 0

// initial generation
metrics = append(metrics, genMetrics(prefix, batchID, genSize)...)
batchID++
go launchAgent(curID, agentSize, time.Duration(flush)*time.Millisecond, carbon, prefix)
log.Printf("agent %d: launched\n", curID)
curID++

// initial flush
err := flushMetrics(carbon, metrics)
if err != nil {
log.Fatal(err)
}

// inner loop
for {
select {
case <-flushTicker:
err := flushMetrics(carbon, metrics)
if err != nil {
log.Fatal(err)
}
case <-genTicker:
metrics = append(metrics, genMetrics(prefix, batchID, genSize)...)
batchID++
case <-time.NewTicker(time.Duration(gen) * time.Millisecond).C:
// sleep for some jitter
time.Sleep(time.Duration(rand.Intn(jitter)) * time.Millisecond)

go launchAgent(curID, agentSize, time.Duration(flush)*time.Millisecond, carbon, prefix)
log.Printf("agent %d: launched\n", curID)
curID++
}
}
}

0 comments on commit 9f012f3

Please sign in to comment.