From 7ba9e46c6de1a828b4ad32ab9fdd4f04166c6f36 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Fri, 11 Oct 2019 19:06:04 +0300 Subject: [PATCH 01/29] added prometheus and dump sender --- .gitignore | 1 + Dockerfile | 1 + clickhouse.go | 22 ++++++- clickhouse_test.go | 18 +----- collector.go | 16 +++-- collector_test.go | 5 +- dump.go | 144 +++++++++++++++++++++++++++++++++++++++++++++ dump_test.go | 33 +++++++++++ go.mod | 5 +- go.sum | 64 ++++++++++++++++++++ main.go | 34 +++++++++++ main_test.go | 54 ++++++++++++++++- sender.go | 16 ++++- server.go | 9 ++- server_test.go | 9 +-- utils.go | 24 -------- 16 files changed, 394 insertions(+), 61 deletions(-) create mode 100644 dump.go create mode 100644 dump_test.go diff --git a/.gitignore b/.gitignore index e79014a..35ba4ef 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ _testmain.go *.exe *.test *.prof +dumptest .vscode .idea/ diff --git a/Dockerfile b/Dockerfile index 1b1a865..e4e0b2a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,7 @@ RUN go build -v FROM alpine:latest RUN apk add ca-certificates WORKDIR /app +RUN mkdir /app/dumps COPY --from=builder /go/src/github.com/nikepan/clickhouse-bulk/config.sample.json . COPY --from=builder /go/src/github.com/nikepan/clickhouse-bulk/clickhouse-bulk . EXPOSE 8123 diff --git a/clickhouse.go b/clickhouse.go index 784bf61..7cf0dba 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -52,6 +52,23 @@ func (c *Clickhouse) AddServer(url string) { c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{}}) } +// DumpServers - dump servers state to prometheus +func (c *Clickhouse) DumpServers() { + c.mu.Lock() + defer c.mu.Unlock() + good := 0 + bad := 0 + for _, s := range c.Servers { + if s.Bad { + bad++ + } else { + good++ + } + } + goodServers.Set(float64(good)) + badServers.Set(float64(bad)) +} + // GetNextServer - getting next server for request func (c *Clickhouse) GetNextServer() (srv *ClickhouseServer) { c.mu.Lock() @@ -89,6 +106,7 @@ func (c *Clickhouse) Send(queryString string, data string) { // Dump - save query to file func (c *Clickhouse) Dump(params string, data string) error { + dumpCounter.Inc() if c.Dumper != nil { c.mu.Lock() defer c.mu.Unlock() @@ -119,7 +137,10 @@ func (c *Clickhouse) Run() { if status != http.StatusOK { log.Printf("Send ERROR %+v: %+v\n", status, resp) c.Dump(data.Params, data.Content) + } else { + sentCounter.Inc() } + c.DumpServers() c.wg.Done() } } @@ -161,7 +182,6 @@ func (c *Clickhouse) SendQuery(queryString string, data string) (response string } return r, status } - c.Dump(queryString, data) return "No working clickhouse servers", http.StatusBadGateway } } diff --git a/clickhouse_test.go b/clickhouse_test.go index 1f91c2b..db8275d 100644 --- a/clickhouse_test.go +++ b/clickhouse_test.go @@ -1,12 +1,11 @@ package main import ( - "github.com/stretchr/testify/assert" - "io/ioutil" "net/http" - "os" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestClickhouse_GetNextServer(t *testing.T) { @@ -53,16 +52,3 @@ func TestClickhouse_SendQuery1(t *testing.T) { s := c.GetNextServer() assert.Equal(t, false, s.Bad) } - -func TestClickhouse_Dump(t *testing.T) { - const dumpName = "dump1.dmp" - c := NewClickhouse(-1) - c.Dumper = new(FileDumper) - c.AddServer("") - c.Dump("eee", "eee") - assert.True(t, c.Empty()) - buf, err := ioutil.ReadFile(dumpName) - assert.Nil(t, err) - assert.Equal(t, "eee\neee", string(buf)) - os.Remove(dumpName) -} diff --git a/collector.go b/collector.go index c21e518..0fc5200 100644 --- a/collector.go +++ b/collector.go @@ -15,7 +15,7 @@ var regexValues = regexp.MustCompile("(?i)\\svalues\\s") type Table struct { Name string Rows []string - Count int + count int FlushCount int FlushInterval int mu sync.Mutex @@ -61,14 +61,14 @@ func (t *Table) Flush() { rows := t.Content() t.Sender.Send(t.Name, rows) t.Rows = make([]string, 0, t.FlushCount) - t.Count = 0 + t.count = 0 } // CheckFlush - check if flush is need and sends data to clickhouse func (t *Table) CheckFlush() bool { t.mu.Lock() defer t.mu.Unlock() - if t.Count > 0 { + if t.count > 0 { t.Flush() return true } @@ -77,9 +77,14 @@ func (t *Table) CheckFlush() bool { // Empty - Checks if table is empty func (t *Table) Empty() bool { + return t.GetCount() == 0 +} + +// GetCount - Checks if table is empty +func (t *Table) GetCount() int { t.mu.Lock() defer t.mu.Unlock() - return t.Count == 0 + return t.count } // RunTimer - timer for periodical savings data @@ -97,7 +102,7 @@ func (t *Table) Add(text string) { count := strings.Count(text, "\n") + 1 t.mu.Lock() defer t.mu.Unlock() - t.Count += count + t.count += count t.Rows = append(t.Rows, text) if len(t.Rows) >= t.FlushCount { t.Flush() @@ -167,6 +172,7 @@ func (c *Collector) Push(params string, content string) { } table.Add(content) c.mu.Unlock() + pushCounter.Inc() } // ParseQuery - parsing inbound query to unified format (params/query), content (query data) diff --git a/collector_test.go b/collector_test.go index 934983a..cea52aa 100644 --- a/collector_test.go +++ b/collector_test.go @@ -1,11 +1,12 @@ package main import ( - "github.com/stretchr/testify/assert" "net/url" "strings" "testing" "time" + + "github.com/stretchr/testify/assert" ) const qTitle = "INSERT INTO table3 (c1, c2, c3) FORMAT TabSeparated" @@ -34,7 +35,7 @@ func TestCollector_Push(t *testing.T) { for i := 0; i < 10400; i++ { c.Push(escTitle, qContent) } - assert.Equal(t, c.Tables[escTitle].Count, 800) + assert.Equal(t, c.Tables[escTitle].GetCount(), 800) } func BenchmarkCollector_ParseQuery(b *testing.B) { diff --git a/dump.go b/dump.go new file mode 100644 index 0000000..15bdae8 --- /dev/null +++ b/dump.go @@ -0,0 +1,144 @@ +package main + +import ( + "errors" + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "strconv" + "time" +) + +// NoDumps - signal that dumps not found +type NoDumps struct { +} + +func (e NoDumps) Error() string { + return "No dumps" +} + +// Dumper - interface for dump data +type Dumper interface { + Dump(params string, data string) error +} + +// FileDumper - dumps data to file system +type FileDumper struct { + Path string + DumpNum int + LockedFiles map[string]bool +} + +func (d *FileDumper) makePath(id string) string { + return path.Join(d.Path, id) +} + +func (d *FileDumper) checkDir() error { + _, err := os.Stat(d.Path) + if os.IsNotExist(err) { + return os.Mkdir(d.Path, 777) + } + return err +} + +// Dump - dumps data to files +func (d *FileDumper) Dump(params string, data string) error { + err := d.checkDir() + if err != nil { + return err + } + d.DumpNum++ + err = ioutil.WriteFile(path.Join(d.Path, "dump"+strconv.Itoa(d.DumpNum)+".dmp"), []byte(params+"\n"+data), 0644) + if err != nil { + log.Printf("dump error: %+v\n", err) + } + return err +} + +// GetDump - get dump file from filesystem +func (d *FileDumper) GetDump() (string, error) { + err := d.checkDir() + if err != nil { + return "", err + } + + files, err := ioutil.ReadDir(d.Path) + if err != nil { + log.Fatal(err) + } + dumpFiles := make([]string, 10) + for _, f := range files { + if filepath.Ext(f.Name()) == ".dmp" { + dumpFiles = append(dumpFiles, f.Name()) + } + } + + queuedDumps.Set(float64(len(dumpFiles))) + + for _, f := range dumpFiles { + found, _ := d.LockedFiles[f] + if !found { + return f, err + } + } + return "", &NoDumps{} +} + +// GetDumpData - get dump data from filesystem +func (d *FileDumper) GetDumpData(id string) (string, error) { + path := d.makePath(id) + s, err := ioutil.ReadFile(path) + return string(s), err +} + +// DeleteDump - get dump data from filesystem +func (d *FileDumper) DeleteDump(id string) error { + path := d.makePath(id) + err := os.Remove(path) + return err +} + +// ProcessNextDump - try to send next dump to server +func (d *FileDumper) ProcessNextDump(sender Sender) error { + f, err := d.GetDump() + if errors.Is(err, NoDumps{}) { + return err + } + if err != nil { + log.Printf("dump search error: %+v\n", err) + } + if f == "" { + return nil + } + data, err := d.GetDumpData(f) + if err != nil { + log.Printf("dump read error: %+v\n", err) + } + _, status := sender.SendQuery(data, "") + if status < 300 { + err := d.DeleteDump(f) + if err != nil { + d.LockedFiles[f] = true + log.Printf("dump delete error: %+v\n", err) + } + } + return err +} + +// Listen - reads dumps from disk and try to send it +func (d *FileDumper) Listen(sender Sender, interval int) { + d.LockedFiles = make(map[string]bool) + ticker := time.NewTicker(time.Millisecond * time.Duration(interval)) + go func() { + for range ticker.C { + for { + err := d.ProcessNextDump(sender) + if err != nil { + break + } + } + } + }() +} diff --git a/dump_test.go b/dump_test.go new file mode 100644 index 0000000..26ff5c9 --- /dev/null +++ b/dump_test.go @@ -0,0 +1,33 @@ +package main + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDump_Dump(t *testing.T) { + const dumpName = "dump1.dmp" + c := NewClickhouse(-1) + dumper := new(FileDumper) + dumpDir := "dumptest" + dumper.Path = dumpDir + c.Dumper = dumper + c.AddServer("") + c.Dump("eee", "eee") + assert.True(t, c.Empty()) + buf, err := dumper.GetDumpData(dumpName) + assert.Nil(t, err) + assert.Equal(t, "eee\neee", string(buf)) + + sender := &fakeSender{} + err = dumper.ProcessNextDump(sender) + assert.Nil(t, err) + assert.Len(t, sender.sendQueryHistory, 1) + err = dumper.ProcessNextDump(sender) + assert.True(t, errors.Is(err, NoDumps{})) + assert.Len(t, sender.sendQueryHistory, 1) + + // os.Remove(dumpDir) +} diff --git a/go.mod b/go.mod index 3816999..513ad30 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,10 @@ require ( github.com/mattn/go-colorable v0.1.0 // indirect github.com/mattn/go-isatty v0.0.4 // indirect github.com/nikepan/go-datastructures v1.0.32 + github.com/prometheus/client_golang v1.1.0 github.com/stretchr/testify v1.3.0 github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 // indirect - golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b // indirect - golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e // indirect ) + +go 1.13 diff --git a/go.sum b/go.sum index f8d6919..14e01aa 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,29 @@ +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8bbnE7CX5OEgg= github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s= github.com/labstack/gommon v0.2.8 h1:JvRqmeZcfrHC5u6uVleB4NxxNbzx6gpbJiQknDbKQu0= @@ -8,18 +32,58 @@ github.com/mattn/go-colorable v0.1.0 h1:v2XXALHHh6zHfYTJ+cSkwtyffnaOyR1MXaA91mTr github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nikepan/go-datastructures v1.0.32 h1:fqj5PPpnA6NxQwp2jWUVui7XTNKnjm9nNu+/6BrI4ug= github.com/nikepan/go-datastructures v1.0.32/go.mod h1:YfZa+iHtF6k+LUqWTd0FRfO75X7ssgI3Giir/KTVMCY= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.1.0 h1:BQ53HtBmfOitExawJ6LokA4x8ov/z0SYYb0+HxJfRI8= +github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= +github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= +github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b h1:Elez2XeF2p9uyVj0yEUDqQ56NFcDtcBNkYP7yv8YbUE= golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo= golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 29a237a..fb33d6a 100644 --- a/main.go +++ b/main.go @@ -10,11 +10,39 @@ import ( "strings" "syscall" "time" + + "github.com/prometheus/client_golang/prometheus" ) var version = "unknown" var date = "unknown" +var pushCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ch_received_count", + }) +var sentCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ch_sent_count", + }) +var dumpCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ch_dump_count", + }) +var goodServers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "ch_good_servers", + }) +var badServers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "ch_bad_servers", + }) + +var queuedDumps = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "ch_queued_dumps", + }) + type clickhouseConfig struct { Servers []string `json:"servers"` DownTimeout int `json:"down_timeout"` @@ -84,6 +112,12 @@ func main() { } } + prometheus.MustRegister(pushCounter) + prometheus.MustRegister(sentCounter) + prometheus.MustRegister(dumpCounter) + prometheus.MustRegister(goodServers) + prometheus.MustRegister(badServers) + dumper := new(FileDumper) dumper.Path = cnf.DumpDir sender := NewClickhouse(cnf.Clickhouse.DownTimeout) diff --git a/main_test.go b/main_test.go index 50cdba2..4021f92 100644 --- a/main_test.go +++ b/main_test.go @@ -1,10 +1,62 @@ package main import ( - "github.com/stretchr/testify/assert" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "sync" "testing" + + "github.com/stretchr/testify/assert" ) +func TestMain_MultiServer(t *testing.T) { + + servers := make(map[string]string) + var mu sync.Mutex + + s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "") + req, _ := ioutil.ReadAll(r.Body) + mu.Lock() + defer mu.Unlock() + servers["s1"] = string(req) + })) + defer s1.Close() + + s2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "") + req, _ := ioutil.ReadAll(r.Body) + mu.Lock() + defer mu.Unlock() + servers["s2"] = string(req) + })) + defer s2.Close() + + sender := NewClickhouse(10) + sender.AddServer(s1.URL) + sender.AddServer(s2.URL) + collect := NewCollector(sender, 1000, 1000) + collect.AddTable("test") + collect.Push("eee", "eee") + collect.Push("fff", "fff") + collect.Push("ggg", "ggg") + + assert.False(t, collect.Empty()) + + SafeQuit(collect, sender) + + if servers["s1"] == "ggg" { + assert.Equal(t, "fff", servers["s2"]) + } else if servers["s1"] == "fff" { + assert.Equal(t, "ggg", servers["s2"]) + } + + assert.True(t, collect.Empty()) + assert.True(t, sender.Empty()) +} + func TestMain_SafeQuit(t *testing.T) { sender := &fakeSender{} collect := NewCollector(sender, 1000, 1000) diff --git a/sender.go b/sender.go index fe225f9..7d0ab75 100644 --- a/sender.go +++ b/sender.go @@ -1,6 +1,9 @@ package main -import "net/http" +import ( + "log" + "net/http" +) // Sender interface for send requests type Sender interface { @@ -11,11 +14,18 @@ type Sender interface { WaitFlush() (err error) } -type fakeSender struct{} +type fakeSender struct { + sendHistory []string + sendQueryHistory []string +} -func (s *fakeSender) Send(queryString string, data string) {} +func (s *fakeSender) Send(queryString string, data string) { + s.sendHistory = append(s.sendHistory, queryString+" "+data) +} func (s *fakeSender) SendQuery(queryString string, data string) (response string, status int) { + s.sendQueryHistory = append(s.sendQueryHistory, queryString+" "+data) + log.Printf("send query %+v\n", s.sendQueryHistory) return "", http.StatusOK } diff --git a/server.go b/server.go index 16d0289..b3582a1 100644 --- a/server.go +++ b/server.go @@ -2,10 +2,12 @@ package main import ( "context" - "github.com/labstack/echo" "io/ioutil" "log" "net/http" + + "github.com/labstack/echo" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // Server - main server object @@ -59,6 +61,10 @@ func (server *Server) statusHandler(c echo.Context) error { return c.JSON(200, Status{Status: "ok"}) } +func (server *Server) metricsHandler(c echo.Context) error { + return c.JSON(200, Status{Status: "ok"}) +} + // Start - start http server func (server *Server) Start() error { return server.echo.Start(server.Listen) @@ -74,6 +80,7 @@ func InitServer(listen string, collector *Collector, debug bool) *Server { server := NewServer(listen, collector, debug) server.echo.POST("/", server.writeHandler) server.echo.GET("/status", server.statusHandler) + server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler())) return server } diff --git a/server_test.go b/server_test.go index 29adda4..cd1e6b0 100644 --- a/server_test.go +++ b/server_test.go @@ -1,13 +1,13 @@ package main import ( - "github.com/labstack/echo" - "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" "strings" "testing" - "time" + + "github.com/labstack/echo" + "github.com/stretchr/testify/assert" ) func TestRunServer(t *testing.T) { @@ -39,9 +39,6 @@ func TestRunServer(t *testing.T) { e.GET("/status", server.statusHandler) status, _ = request("GET", "/status", "", e) assert.Equal(t, status, http.StatusOK) - - go main() - time.Sleep(50) } func request(method, path string, body string, e *echo.Echo) (int, string) { diff --git a/utils.go b/utils.go index eaeb29b..818dca3 100644 --- a/utils.go +++ b/utils.go @@ -2,10 +2,7 @@ package main import ( "encoding/json" - "io/ioutil" "os" - "path" - "strconv" "strings" ) @@ -24,24 +21,3 @@ func ReadJSON(fn string, v interface{}) error { func HasPrefix(s, prefix string) bool { return len(s) >= len(prefix) && strings.ToLower(s[0:len(prefix)]) == strings.ToLower(prefix) } - -// Dumper - interface for dump data -type Dumper interface { - Dump(params string, data string) error -} - -// FileDumper - dumps data to file system -type FileDumper struct { - Path string - DumpNum int -} - -// Dump - dumps data to files -func (d *FileDumper) Dump(params string, data string) error { - if _, err := os.Stat(d.Path); os.IsNotExist(err) { - os.Mkdir(d.Path, 644) - } - d.DumpNum++ - err := ioutil.WriteFile(path.Join(d.Path, "dump"+strconv.Itoa(d.DumpNum)+".dmp"), []byte(params+"\n"+data), 0644) - return err -} From a4f68e085a8aa56280e4c5eece616ec715f1cd78 Mon Sep 17 00:00:00 2001 From: PaNick Date: Sun, 13 Oct 2019 21:00:10 +0300 Subject: [PATCH 02/29] fixes of auto dump sender --- clickhouse.go | 4 ++-- collector.go | 1 - collector_test.go | 9 ++++++--- config.sample.json | 1 + dump.go | 33 ++++++++++++++++++++------------- dump_test.go | 6 +++--- go.sum | 7 +------ main.go | 15 +++++++++------ main_test.go | 4 +++- sender.go | 6 +++++- server.go | 2 +- 11 files changed, 51 insertions(+), 37 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index 7cf0dba..766aee2 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -135,7 +135,7 @@ func (c *Clickhouse) Run() { data := datas[0].(ClickhouseRequest) resp, status := c.SendQuery(data.Params, data.Content) if status != http.StatusOK { - log.Printf("Send ERROR %+v: %+v\n", status, resp) + log.Printf("ERROR: Send %+v: %+v\n", status, resp) c.Dump(data.Params, data.Content) } else { sentCounter.Inc() @@ -156,7 +156,7 @@ func (c *Clickhouse) WaitFlush() (err error) { func (srv *ClickhouseServer) SendQuery(queryString string, data string) (response string, status int) { if srv.URL != "" { - log.Printf("send %+v rows to %+v of %+v\n", strings.Count(data, "\n")+1, srv.URL, queryString) + log.Printf("INFO: send %+v rows to %+v of %+v\n", strings.Count(data, "\n")+1, srv.URL, queryString) resp, err := srv.Client.Post(srv.URL+"?"+queryString, "", strings.NewReader(data)) if err != nil { diff --git a/collector.go b/collector.go index 0fc5200..0dc2782 100644 --- a/collector.go +++ b/collector.go @@ -167,7 +167,6 @@ func (c *Collector) Push(params string, content string) { c.mu.Lock() table, ok = c.Tables[params] if !ok { - //log.Printf("'%+v'\n", params) table = c.addTable(params) } table.Add(content) diff --git a/collector_test.go b/collector_test.go index cea52aa..ae5cd8c 100644 --- a/collector_test.go +++ b/collector_test.go @@ -35,7 +35,7 @@ func TestCollector_Push(t *testing.T) { for i := 0; i < 10400; i++ { c.Push(escTitle, qContent) } - assert.Equal(t, c.Tables[escTitle].GetCount(), 800) + assert.Equal(t, 800, c.Tables[escTitle].GetCount()) } func BenchmarkCollector_ParseQuery(b *testing.B) { @@ -123,11 +123,14 @@ func TestCollector_ParseQuery(t *testing.T) { } func TestTable_CheckFlush(t *testing.T) { - c := NewCollector(&fakeSender{}, 1000, 1) + c := NewCollector(&fakeSender{}, 1000, 1000) c.Push(qTitle, qContent) + count := 0 for !c.Tables[qTitle].Empty() { - time.Sleep(10) + time.Sleep(time.Millisecond * time.Duration(100)) + count++ } + assert.True(t, count >= 9) } func TestCollector_FlushAll(t *testing.T) { diff --git a/config.sample.json b/config.sample.json index faaea89..9d489e1 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,6 +2,7 @@ "listen": ":8124", "flush_count": 10000, "flush_interval": 1000, + "dump_check_interval": 30000, "debug": false, "dump_dir": "dumps", "clickhouse": { diff --git a/dump.go b/dump.go index 15bdae8..8c10154 100644 --- a/dump.go +++ b/dump.go @@ -8,16 +8,14 @@ import ( "path" "path/filepath" "strconv" + "sync" "time" ) -// NoDumps - signal that dumps not found -type NoDumps struct { -} +const defaultDumpCheckInterval = 30000 -func (e NoDumps) Error() string { - return "No dumps" -} +// ErrNoDumps - signal that dumps not found +var ErrNoDumps = errors.New("No dumps") // Dumper - interface for dump data type Dumper interface { @@ -29,6 +27,7 @@ type FileDumper struct { Path string DumpNum int LockedFiles map[string]bool + mu sync.Mutex } func (d *FileDumper) makePath(id string) string { @@ -45,6 +44,8 @@ func (d *FileDumper) checkDir() error { // Dump - dumps data to files func (d *FileDumper) Dump(params string, data string) error { + d.mu.Lock() + defer d.mu.Unlock() err := d.checkDir() if err != nil { return err @@ -52,7 +53,7 @@ func (d *FileDumper) Dump(params string, data string) error { d.DumpNum++ err = ioutil.WriteFile(path.Join(d.Path, "dump"+strconv.Itoa(d.DumpNum)+".dmp"), []byte(params+"\n"+data), 0644) if err != nil { - log.Printf("dump error: %+v\n", err) + log.Printf("ERROR: dump to file: %+v\n", err) } return err } @@ -68,7 +69,7 @@ func (d *FileDumper) GetDump() (string, error) { if err != nil { log.Fatal(err) } - dumpFiles := make([]string, 10) + dumpFiles := make([]string, 0) for _, f := range files { if filepath.Ext(f.Name()) == ".dmp" { dumpFiles = append(dumpFiles, f.Name()) @@ -83,7 +84,7 @@ func (d *FileDumper) GetDump() (string, error) { return f, err } } - return "", &NoDumps{} + return "", ErrNoDumps } // GetDumpData - get dump data from filesystem @@ -102,26 +103,29 @@ func (d *FileDumper) DeleteDump(id string) error { // ProcessNextDump - try to send next dump to server func (d *FileDumper) ProcessNextDump(sender Sender) error { + d.mu.Lock() + defer d.mu.Unlock() f, err := d.GetDump() - if errors.Is(err, NoDumps{}) { + if errors.Is(err, ErrNoDumps) { return err } if err != nil { - log.Printf("dump search error: %+v\n", err) + log.Printf("ERROR: dump search: %+v\n", err) } if f == "" { return nil } data, err := d.GetDumpData(f) if err != nil { - log.Printf("dump read error: %+v\n", err) + log.Printf("ERROR: dump read: %+v\n", err) } _, status := sender.SendQuery(data, "") if status < 300 { + log.Printf("INFO: dump sended: %+v\n", f) err := d.DeleteDump(f) if err != nil { d.LockedFiles[f] = true - log.Printf("dump delete error: %+v\n", err) + log.Printf("ERROR: dump delete: %+v\n", err) } } return err @@ -130,6 +134,9 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { // Listen - reads dumps from disk and try to send it func (d *FileDumper) Listen(sender Sender, interval int) { d.LockedFiles = make(map[string]bool) + if interval == 0 { + interval = defaultDumpCheckInterval + } ticker := time.NewTicker(time.Millisecond * time.Duration(interval)) go func() { for range ticker.C { diff --git a/dump_test.go b/dump_test.go index 26ff5c9..1814412 100644 --- a/dump_test.go +++ b/dump_test.go @@ -2,6 +2,7 @@ package main import ( "errors" + "os" "testing" "github.com/stretchr/testify/assert" @@ -26,8 +27,7 @@ func TestDump_Dump(t *testing.T) { assert.Nil(t, err) assert.Len(t, sender.sendQueryHistory, 1) err = dumper.ProcessNextDump(sender) - assert.True(t, errors.Is(err, NoDumps{})) + assert.True(t, errors.Is(err, ErrNoDumps)) assert.Len(t, sender.sendQueryHistory, 1) - - // os.Remove(dumpDir) + os.Remove(dumpDir) } diff --git a/go.sum b/go.sum index 14e01aa..ee99ecc 100644 --- a/go.sum +++ b/go.sum @@ -4,7 +4,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -56,7 +55,6 @@ github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkp github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -69,8 +67,6 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b h1:Elez2XeF2p9uyVj0yEUDqQ56NFcDtcBNkYP7yv8YbUE= -golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -79,9 +75,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e h1:njOxP/wVblhCLIUhjHXf6X+dzTt5OQ3vMQo9mkOIKIo= -golang.org/x/sys v0.0.0-20181206074257-70b957f3b65e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/main.go b/main.go index fb33d6a..ade50d8 100644 --- a/main.go +++ b/main.go @@ -49,12 +49,13 @@ type clickhouseConfig struct { } type config struct { - Listen string `json:"listen"` - Clickhouse clickhouseConfig `json:"clickhouse"` - FlushCount int `json:"flush_count"` - FlushInterval int `json:"flush_interval"` - DumpDir string `json:"dump_dir"` - Debug bool `json:"debug"` + Listen string `json:"listen"` + Clickhouse clickhouseConfig `json:"clickhouse"` + FlushCount int `json:"flush_count"` + FlushInterval int `json:"flush_interval"` + DumpCheckInterval int `json:"dump_check_interval"` + DumpDir string `json:"dump_dir"` + Debug bool `json:"debug"` } // SafeQuit - safe prepare to quit @@ -148,6 +149,8 @@ func main() { } }() + dumper.Listen(sender, cnf.DumpCheckInterval) + err = srv.Start() if err != nil { log.Printf("ListenAndServe: %+v\n", err) diff --git a/main_test.go b/main_test.go index 4021f92..ab0e2ed 100644 --- a/main_test.go +++ b/main_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -46,6 +47,7 @@ func TestMain_MultiServer(t *testing.T) { assert.False(t, collect.Empty()) SafeQuit(collect, sender) + time.Sleep(100) // wait for http servers process requests if servers["s1"] == "ggg" { assert.Equal(t, "fff", servers["s2"]) @@ -61,7 +63,7 @@ func TestMain_SafeQuit(t *testing.T) { sender := &fakeSender{} collect := NewCollector(sender, 1000, 1000) collect.AddTable("test") - collect.Push("eee", "eee") + collect.Push("sss", "sss") assert.False(t, collect.Empty()) diff --git a/sender.go b/sender.go index 7d0ab75..f8f3441 100644 --- a/sender.go +++ b/sender.go @@ -3,6 +3,7 @@ package main import ( "log" "net/http" + "sync" ) // Sender interface for send requests @@ -17,15 +18,18 @@ type Sender interface { type fakeSender struct { sendHistory []string sendQueryHistory []string + mu sync.Mutex } func (s *fakeSender) Send(queryString string, data string) { + s.mu.Lock() + defer s.mu.Unlock() s.sendHistory = append(s.sendHistory, queryString+" "+data) } func (s *fakeSender) SendQuery(queryString string, data string) (response string, status int) { s.sendQueryHistory = append(s.sendQueryHistory, queryString+" "+data) - log.Printf("send query %+v\n", s.sendQueryHistory) + log.Printf("DEBUG: send query: %+v\n", s.sendQueryHistory) return "", http.StatusOK } diff --git a/server.go b/server.go index b3582a1..0be822f 100644 --- a/server.go +++ b/server.go @@ -36,7 +36,7 @@ func (server *Server) writeHandler(c echo.Context) error { s := string(q) if server.Debug { - log.Printf("query %+v %+v\n", c.QueryString(), s) + log.Printf("DEBUG: query %+v %+v\n", c.QueryString(), s) } qs := c.QueryString() From b247d300d78710e61ae6fbeda656a2eb15e06845 Mon Sep 17 00:00:00 2001 From: PaNick Date: Sun, 13 Oct 2019 21:08:47 +0300 Subject: [PATCH 03/29] update go version --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3ee473f..f8612b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go sudo: false go: - - 1.12.4 + - 1.13.1 - tip env: @@ -28,4 +28,4 @@ deploy: on: tags: true repo: nikepan/clickhouse-bulk - condition: $TRAVIS_GO_VERSION =~ ^1\.12\.[0-9]+$ + condition: $TRAVIS_GO_VERSION =~ ^1\.13\.[0-9]+$ From 713fdc185fa09f3555ed8149394bdecee1585b00 Mon Sep 17 00:00:00 2001 From: PaNick Date: Sun, 13 Oct 2019 21:09:21 +0300 Subject: [PATCH 04/29] update go version --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7636194..3a26182 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ or [Use docker image](https://hub.docker.com/r/nikepan/clickhouse-bulk/) -or from sources (Go 1.11+): +or from sources (Go 1.13+): ```text git clone https://github.com/nikepan/clickhouse-bulk From c116fb9fd1420be62322175a4aaabb5850e81f1f Mon Sep 17 00:00:00 2001 From: PaNick Date: Mon, 14 Oct 2019 23:34:44 +0300 Subject: [PATCH 05/29] fix test --- go.sum | 1 + main_test.go | 13 ++++--------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/go.sum b/go.sum index ee99ecc..8306b0d 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,7 @@ github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkp github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main_test.go b/main_test.go index ab0e2ed..4e586eb 100644 --- a/main_test.go +++ b/main_test.go @@ -14,7 +14,7 @@ import ( func TestMain_MultiServer(t *testing.T) { - servers := make(map[string]string) + received := make([]string, 0) var mu sync.Mutex s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -22,7 +22,7 @@ func TestMain_MultiServer(t *testing.T) { req, _ := ioutil.ReadAll(r.Body) mu.Lock() defer mu.Unlock() - servers["s1"] = string(req) + received = append(received, string(req)) })) defer s1.Close() @@ -31,7 +31,7 @@ func TestMain_MultiServer(t *testing.T) { req, _ := ioutil.ReadAll(r.Body) mu.Lock() defer mu.Unlock() - servers["s2"] = string(req) + received = append(received, string(req)) })) defer s2.Close() @@ -49,12 +49,7 @@ func TestMain_MultiServer(t *testing.T) { SafeQuit(collect, sender) time.Sleep(100) // wait for http servers process requests - if servers["s1"] == "ggg" { - assert.Equal(t, "fff", servers["s2"]) - } else if servers["s1"] == "fff" { - assert.Equal(t, "ggg", servers["s2"]) - } - + assert.Equal(t, 3, len(received)) assert.True(t, collect.Empty()) assert.True(t, sender.Empty()) } From fd871ffeda64f2859c2eff40c29124f1fa9c299f Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Tue, 15 Oct 2019 16:48:40 +0300 Subject: [PATCH 06/29] fix test --- dump.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dump.go b/dump.go index 8c10154..3496562 100644 --- a/dump.go +++ b/dump.go @@ -37,7 +37,7 @@ func (d *FileDumper) makePath(id string) string { func (d *FileDumper) checkDir() error { _, err := os.Stat(d.Path) if os.IsNotExist(err) { - return os.Mkdir(d.Path, 777) + return os.Mkdir(d.Path, 0777) } return err } From 97105efd03511f462d594a01c80deeb8ea8e3d9a Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Tue, 15 Oct 2019 19:08:32 +0300 Subject: [PATCH 07/29] dumper fixes --- README.md | 2 ++ clickhouse.go | 23 +++++++++++++++-------- clickhouse_test.go | 8 ++++---- config.sample.json | 3 ++- dump.go | 36 +++++++++++++++++++++++++++--------- dump_test.go | 8 +++----- main.go | 10 +++++----- main_test.go | 2 +- 8 files changed, 59 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 3a26182..1868e30 100644 --- a/README.md +++ b/README.md @@ -58,10 +58,12 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6') "listen": ":8124", "flush_count": 10000, // check by \n char "flush_interval": 1000, // milliseconds + "dump_check_interval": 30, // interval for try to send dumps (seconds) "debug": false, // log incoming requests "dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors) "clickhouse": { "down_timeout": 300, // wait if server in down (seconds) + "connect_timeout": 10, // wait for server connect (seconds) "servers": [ "http://127.0.0.1:8123" ] diff --git a/clickhouse.go b/clickhouse.go index 766aee2..15e0016 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -21,12 +21,13 @@ type ClickhouseServer struct { // Clickhouse - main clickhouse sender object type Clickhouse struct { - Servers []*ClickhouseServer - Queue *queue.Queue - mu sync.Mutex - DownTimeout int - Dumper Dumper - wg sync.WaitGroup + Servers []*ClickhouseServer + Queue *queue.Queue + mu sync.Mutex + DownTimeout int + ConnectTimeout int + Dumper Dumper + wg sync.WaitGroup } // ClickhouseRequest - request struct for queue @@ -36,9 +37,13 @@ type ClickhouseRequest struct { } // NewClickhouse - get clickhouse object -func NewClickhouse(downTimeout int) (c *Clickhouse) { +func NewClickhouse(downTimeout int, connectTimeout int) (c *Clickhouse) { c = new(Clickhouse) c.DownTimeout = downTimeout + c.ConnectTimeout = connectTimeout + if c.ConnectTimeout > 0 { + c.ConnectTimeout = 10 + } c.Servers = make([]*ClickhouseServer, 0) c.Queue = queue.New(1000) go c.Run() @@ -49,7 +54,9 @@ func NewClickhouse(downTimeout int) (c *Clickhouse) { func (c *Clickhouse) AddServer(url string) { c.mu.Lock() defer c.mu.Unlock() - c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{}}) + c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{ + Timeout: time.Second * time.Duration(c.ConnectTimeout), + }}) } // DumpServers - dump servers state to prometheus diff --git a/clickhouse_test.go b/clickhouse_test.go index db8275d..544f8bd 100644 --- a/clickhouse_test.go +++ b/clickhouse_test.go @@ -9,7 +9,7 @@ import ( ) func TestClickhouse_GetNextServer(t *testing.T) { - c := NewClickhouse(300) + c := NewClickhouse(300, 10) c.AddServer("") c.AddServer("http://127.0.0.1:8124") c.AddServer("http://127.0.0.1:8125") @@ -27,7 +27,7 @@ func TestClickhouse_GetNextServer(t *testing.T) { } func TestClickhouse_Send(t *testing.T) { - c := NewClickhouse(300) + c := NewClickhouse(300, 10) c.AddServer("") c.Send("", "") for !c.Queue.Empty() { @@ -36,7 +36,7 @@ func TestClickhouse_Send(t *testing.T) { } func TestClickhouse_SendQuery(t *testing.T) { - c := NewClickhouse(300) + c := NewClickhouse(300, 10) c.AddServer("") c.GetNextServer() c.Servers[0].Bad = true @@ -45,7 +45,7 @@ func TestClickhouse_SendQuery(t *testing.T) { } func TestClickhouse_SendQuery1(t *testing.T) { - c := NewClickhouse(-1) + c := NewClickhouse(-1, 10) c.AddServer("") c.GetNextServer() c.Servers[0].Bad = true diff --git a/config.sample.json b/config.sample.json index 9d489e1..93ede58 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,11 +2,12 @@ "listen": ":8124", "flush_count": 10000, "flush_interval": 1000, - "dump_check_interval": 30000, + "dump_check_interval": 30, "debug": false, "dump_dir": "dumps", "clickhouse": { "down_timeout": 300, + "connect_timeout": 10, "servers": [ "http://127.0.0.1:8123" ] diff --git a/dump.go b/dump.go index 3496562..c058a4f 100644 --- a/dump.go +++ b/dump.go @@ -2,6 +2,7 @@ package main import ( "errors" + "fmt" "io/ioutil" "log" "os" @@ -12,7 +13,7 @@ import ( "time" ) -const defaultDumpCheckInterval = 30000 +const defaultDumpCheckInterval = 30 // ErrNoDumps - signal that dumps not found var ErrNoDumps = errors.New("No dumps") @@ -25,6 +26,7 @@ type Dumper interface { // FileDumper - dumps data to file system type FileDumper struct { Path string + DumpPrefix string DumpNum int LockedFiles map[string]bool mu sync.Mutex @@ -42,6 +44,18 @@ func (d *FileDumper) checkDir() error { return err } +func (d *FileDumper) dumpName(num int) string { + return "dump" + d.DumpPrefix + "-" + strconv.Itoa(num) + ".dmp" +} + +// NewDumper - create new dumper +func NewDumper(path string) *FileDumper { + d := new(FileDumper) + d.Path = path + d.DumpPrefix = time.Now().Format("20060102150405") + return d +} + // Dump - dumps data to files func (d *FileDumper) Dump(params string, data string) error { d.mu.Lock() @@ -51,7 +65,9 @@ func (d *FileDumper) Dump(params string, data string) error { return err } d.DumpNum++ - err = ioutil.WriteFile(path.Join(d.Path, "dump"+strconv.Itoa(d.DumpNum)+".dmp"), []byte(params+"\n"+data), 0644) + err = ioutil.WriteFile( + path.Join(d.Path, d.dumpName(d.DumpNum)), []byte(params+"\n"+data), 0644, + ) if err != nil { log.Printf("ERROR: dump to file: %+v\n", err) } @@ -120,13 +136,14 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { log.Printf("ERROR: dump read: %+v\n", err) } _, status := sender.SendQuery(data, "") - if status < 300 { - log.Printf("INFO: dump sended: %+v\n", f) - err := d.DeleteDump(f) - if err != nil { - d.LockedFiles[f] = true - log.Printf("ERROR: dump delete: %+v\n", err) - } + if status > 299 { + return fmt.Errorf("server status %+v", status) + } + log.Printf("INFO: dump sended: %+v\n", f) + err = d.DeleteDump(f) + if err != nil { + d.LockedFiles[f] = true + log.Printf("ERROR: dump delete: %+v\n", err) } return err } @@ -143,6 +160,7 @@ func (d *FileDumper) Listen(sender Sender, interval int) { for { err := d.ProcessNextDump(sender) if err != nil { + log.Printf("WARNING: %+v\n", err) break } } diff --git a/dump_test.go b/dump_test.go index 1814412..f33958c 100644 --- a/dump_test.go +++ b/dump_test.go @@ -9,16 +9,14 @@ import ( ) func TestDump_Dump(t *testing.T) { - const dumpName = "dump1.dmp" - c := NewClickhouse(-1) - dumper := new(FileDumper) + c := NewClickhouse(-1, 10) dumpDir := "dumptest" - dumper.Path = dumpDir + dumper := NewDumper(dumpDir) c.Dumper = dumper c.AddServer("") c.Dump("eee", "eee") assert.True(t, c.Empty()) - buf, err := dumper.GetDumpData(dumpName) + buf, err := dumper.GetDumpData(dumper.dumpName(1)) assert.Nil(t, err) assert.Equal(t, "eee\neee", string(buf)) diff --git a/main.go b/main.go index ade50d8..aacb6cd 100644 --- a/main.go +++ b/main.go @@ -44,8 +44,9 @@ var queuedDumps = prometheus.NewGauge( }) type clickhouseConfig struct { - Servers []string `json:"servers"` - DownTimeout int `json:"down_timeout"` + Servers []string `json:"servers"` + DownTimeout int `json:"down_timeout"` + ConnectTimeout int `json:"connect_timeout"` } type config struct { @@ -119,9 +120,8 @@ func main() { prometheus.MustRegister(goodServers) prometheus.MustRegister(badServers) - dumper := new(FileDumper) - dumper.Path = cnf.DumpDir - sender := NewClickhouse(cnf.Clickhouse.DownTimeout) + dumper := NewDumper(cnf.DumpDir) + sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout) sender.Dumper = dumper for _, url := range cnf.Clickhouse.Servers { sender.AddServer(url) diff --git a/main_test.go b/main_test.go index 4e586eb..8ebeda5 100644 --- a/main_test.go +++ b/main_test.go @@ -35,7 +35,7 @@ func TestMain_MultiServer(t *testing.T) { })) defer s2.Close() - sender := NewClickhouse(10) + sender := NewClickhouse(10, 10) sender.AddServer(s1.URL) sender.AddServer(s2.URL) collect := NewCollector(sender, 1000, 1000) From d82d0a756470ccc6484b9d076f55e044f3ff18bf Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Tue, 15 Oct 2019 19:41:08 +0300 Subject: [PATCH 08/29] improve coverage --- dump.go | 2 +- dump_test.go | 8 ++++++++ main_test.go | 6 ++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/dump.go b/dump.go index c058a4f..bd80416 100644 --- a/dump.go +++ b/dump.go @@ -154,7 +154,7 @@ func (d *FileDumper) Listen(sender Sender, interval int) { if interval == 0 { interval = defaultDumpCheckInterval } - ticker := time.NewTicker(time.Millisecond * time.Duration(interval)) + ticker := time.NewTicker(time.Second * time.Duration(interval)) go func() { for range ticker.C { for { diff --git a/dump_test.go b/dump_test.go index f33958c..63c454c 100644 --- a/dump_test.go +++ b/dump_test.go @@ -4,6 +4,7 @@ import ( "errors" "os" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -27,5 +28,12 @@ func TestDump_Dump(t *testing.T) { err = dumper.ProcessNextDump(sender) assert.True(t, errors.Is(err, ErrNoDumps)) assert.Len(t, sender.sendQueryHistory, 1) + + dumper.Listen(sender, 1) + c.Dump("eee", "eee") + time.Sleep(time.Second) + err = dumper.ProcessNextDump(sender) + assert.True(t, errors.Is(err, ErrNoDumps)) + os.Remove(dumpDir) } diff --git a/main_test.go b/main_test.go index 8ebeda5..2c1c4e8 100644 --- a/main_test.go +++ b/main_test.go @@ -67,3 +67,9 @@ func TestMain_SafeQuit(t *testing.T) { assert.True(t, collect.Empty()) assert.True(t, sender.Empty()) } + +func TestMain_ReadJSON(t *testing.T) { + cnf := config{} + err := ReadJSON("config.sample.json", &cnf) + assert.Nil(t, err) +} From e350f037e1c8b9e2f5ef786e739630a89a9040c0 Mon Sep 17 00:00:00 2001 From: PaNick Date: Tue, 15 Oct 2019 23:03:23 +0300 Subject: [PATCH 09/29] improve coverage and refactoring --- dump.go | 4 +- main.go | 136 ++----------------------------------------------- main_test.go | 75 --------------------------- metrics.go | 41 +++++++++++++++ server.go | 62 ++++++++++++++++++++-- server_test.go | 95 ++++++++++++++++++++++++++++++---- utils.go | 58 +++++++++++++++++++++ 7 files changed, 248 insertions(+), 223 deletions(-) delete mode 100644 main_test.go create mode 100644 metrics.go diff --git a/dump.go b/dump.go index bd80416..004aca4 100644 --- a/dump.go +++ b/dump.go @@ -160,7 +160,9 @@ func (d *FileDumper) Listen(sender Sender, interval int) { for { err := d.ProcessNextDump(sender) if err != nil { - log.Printf("WARNING: %+v\n", err) + if !errors.Is(err, ErrNoDumps) { + log.Printf("WARNING: %+v\n", err) + } break } } diff --git a/main.go b/main.go index aacb6cd..5d4385c 100644 --- a/main.go +++ b/main.go @@ -1,76 +1,14 @@ package main import ( - "context" "flag" "log" "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - - "github.com/prometheus/client_golang/prometheus" ) var version = "unknown" var date = "unknown" -var pushCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "ch_received_count", - }) -var sentCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "ch_sent_count", - }) -var dumpCounter = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "ch_dump_count", - }) -var goodServers = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "ch_good_servers", - }) -var badServers = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "ch_bad_servers", - }) - -var queuedDumps = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "ch_queued_dumps", - }) - -type clickhouseConfig struct { - Servers []string `json:"servers"` - DownTimeout int `json:"down_timeout"` - ConnectTimeout int `json:"connect_timeout"` -} - -type config struct { - Listen string `json:"listen"` - Clickhouse clickhouseConfig `json:"clickhouse"` - FlushCount int `json:"flush_count"` - FlushInterval int `json:"flush_interval"` - DumpCheckInterval int `json:"dump_check_interval"` - DumpDir string `json:"dump_dir"` - Debug bool `json:"debug"` -} - -// SafeQuit - safe prepare to quit -func SafeQuit(collect *Collector, sender Sender) { - collect.FlushAll() - if count := sender.Len(); count > 0 { - log.Printf("Sending %+v tables\n", count) - } - for !sender.Empty() && !collect.Empty() { - collect.WaitFlush() - } - collect.WaitFlush() -} - func main() { log.SetOutput(os.Stdout) @@ -84,77 +22,9 @@ func main() { return } - cnf := config{} - err := ReadJSON(*configFile, &cnf) - if err != nil { - log.Printf("Config file %+v not found. Use config.sample.json\n", *configFile) - err := ReadJSON("config.sample.json", &cnf) - if err != nil { - log.Fatalf("Read config: %+v\n", err.Error()) - } - } - - serversList := os.Getenv("CLICKHOUSE_SERVERS") - if serversList != "" { - cnf.Clickhouse.Servers = strings.Split(serversList, ",") - log.Printf("use servers: %+v\n", serversList) - } - flushCount := os.Getenv("CLICKHOUSE_FLUSH_COUNT") - if flushCount != "" { - cnf.FlushCount, err = strconv.Atoi(flushCount) - if err != nil { - log.Fatalf("Wrong flush count env: %+v\n", err.Error()) - } - } - flushInterval := os.Getenv("CLICKHOUSE_FLUSH_INTERVAL") - if flushInterval != "" { - cnf.FlushInterval, err = strconv.Atoi(flushInterval) - if err != nil { - log.Fatalf("Wrong flush interval env: %+v\n", err.Error()) - } - } - - prometheus.MustRegister(pushCounter) - prometheus.MustRegister(sentCounter) - prometheus.MustRegister(dumpCounter) - prometheus.MustRegister(goodServers) - prometheus.MustRegister(badServers) - - dumper := NewDumper(cnf.DumpDir) - sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout) - sender.Dumper = dumper - for _, url := range cnf.Clickhouse.Servers { - sender.AddServer(url) - } - - collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval) - - // send collected data on SIGTERM and exit - signals := make(chan os.Signal) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - - srv := InitServer(cnf.Listen, collect, cnf.Debug) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - go func() { - for { - _ = <-signals - log.Printf("STOP signal\n") - if err := srv.Shutdown(ctx); err != nil { - log.Printf("Shutdown error %+v\n", err) - SafeQuit(collect, sender) - os.Exit(1) - } - } - }() - - dumper.Listen(sender, cnf.DumpCheckInterval) - - err = srv.Start() + cnf, err := ReadConfig(*configFile) if err != nil { - log.Printf("ListenAndServe: %+v\n", err) - SafeQuit(collect, sender) - os.Exit(1) + log.Fatalf("ERROR: %+v\n", err) } + RunServer(cnf) } diff --git a/main_test.go b/main_test.go deleted file mode 100644 index 2c1c4e8..0000000 --- a/main_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestMain_MultiServer(t *testing.T) { - - received := make([]string, 0) - var mu sync.Mutex - - s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, "") - req, _ := ioutil.ReadAll(r.Body) - mu.Lock() - defer mu.Unlock() - received = append(received, string(req)) - })) - defer s1.Close() - - s2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, "") - req, _ := ioutil.ReadAll(r.Body) - mu.Lock() - defer mu.Unlock() - received = append(received, string(req)) - })) - defer s2.Close() - - sender := NewClickhouse(10, 10) - sender.AddServer(s1.URL) - sender.AddServer(s2.URL) - collect := NewCollector(sender, 1000, 1000) - collect.AddTable("test") - collect.Push("eee", "eee") - collect.Push("fff", "fff") - collect.Push("ggg", "ggg") - - assert.False(t, collect.Empty()) - - SafeQuit(collect, sender) - time.Sleep(100) // wait for http servers process requests - - assert.Equal(t, 3, len(received)) - assert.True(t, collect.Empty()) - assert.True(t, sender.Empty()) -} - -func TestMain_SafeQuit(t *testing.T) { - sender := &fakeSender{} - collect := NewCollector(sender, 1000, 1000) - collect.AddTable("test") - collect.Push("sss", "sss") - - assert.False(t, collect.Empty()) - - SafeQuit(collect, sender) - - assert.True(t, collect.Empty()) - assert.True(t, sender.Empty()) -} - -func TestMain_ReadJSON(t *testing.T) { - cnf := config{} - err := ReadJSON("config.sample.json", &cnf) - assert.Nil(t, err) -} diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..7e2602b --- /dev/null +++ b/metrics.go @@ -0,0 +1,41 @@ +package main + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var pushCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ch_received_count", + }) +var sentCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ch_sent_count", + }) +var dumpCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ch_dump_count", + }) +var goodServers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "ch_good_servers", + }) +var badServers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "ch_bad_servers", + }) + +var queuedDumps = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "ch_queued_dumps", + }) + +// InitMetrics - init prometheus metrics +func InitMetrics() { + + prometheus.MustRegister(pushCounter) + prometheus.MustRegister(sentCounter) + prometheus.MustRegister(dumpCounter) + prometheus.MustRegister(goodServers) + prometheus.MustRegister(badServers) +} diff --git a/server.go b/server.go index 0be822f..a5dcbee 100644 --- a/server.go +++ b/server.go @@ -5,6 +5,10 @@ import ( "io/ioutil" "log" "net/http" + "os" + "os/signal" + "syscall" + "time" "github.com/labstack/echo" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -61,10 +65,6 @@ func (server *Server) statusHandler(c echo.Context) error { return c.JSON(200, Status{Status: "ok"}) } -func (server *Server) metricsHandler(c echo.Context) error { - return c.JSON(200, Status{Status: "ok"}) -} - // Start - start http server func (server *Server) Start() error { return server.echo.Start(server.Listen) @@ -84,3 +84,57 @@ func InitServer(listen string, collector *Collector, debug bool) *Server { return server } + +// SafeQuit - safe prepare to quit +func SafeQuit(collect *Collector, sender Sender) { + collect.FlushAll() + if count := sender.Len(); count > 0 { + log.Printf("Sending %+v tables\n", count) + } + for !sender.Empty() && !collect.Empty() { + collect.WaitFlush() + } + collect.WaitFlush() +} + +// RunServer - run all +func RunServer(cnf Config) { + InitMetrics() + dumper := NewDumper(cnf.DumpDir) + sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout) + sender.Dumper = dumper + for _, url := range cnf.Clickhouse.Servers { + sender.AddServer(url) + } + + collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval) + + // send collected data on SIGTERM and exit + signals := make(chan os.Signal) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + + srv := InitServer(cnf.Listen, collect, cnf.Debug) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { + for { + _ = <-signals + log.Printf("STOP signal\n") + if err := srv.Shutdown(ctx); err != nil { + log.Printf("Shutdown error %+v\n", err) + SafeQuit(collect, sender) + os.Exit(1) + } + } + }() + + dumper.Listen(sender, cnf.DumpCheckInterval) + + err := srv.Start() + if err != nil { + log.Printf("ListenAndServe: %+v\n", err) + SafeQuit(collect, sender) + os.Exit(1) + } +} diff --git a/server_test.go b/server_test.go index cd1e6b0..cf79ca1 100644 --- a/server_test.go +++ b/server_test.go @@ -1,44 +1,119 @@ package main import ( + "context" + "fmt" + "io/ioutil" "net/http" "net/http/httptest" "strings" + "sync" "testing" + "time" "github.com/labstack/echo" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" ) func TestRunServer(t *testing.T) { collector := NewCollector(&fakeSender{}, 1000, 1000) - server := NewServer("", collector, false) - e := echo.New() - e.POST("/", server.writeHandler) + server := InitServer("", collector, false) + go server.Start() + server.echo.POST("/", server.writeHandler) - status, resp := request("POST", "/", "", e) + status, resp := request("POST", "/", "", server.echo) assert.Equal(t, status, http.StatusOK) assert.Equal(t, resp, "") - status, resp = request("POST", "/?query="+escSelect, "", e) + status, resp = request("POST", "/?query="+escSelect, "", server.echo) assert.Equal(t, status, http.StatusOK) assert.Equal(t, resp, "") - status, resp = request("POST", "/?query="+escTitle, qContent, e) + status, resp = request("POST", "/?query="+escTitle, qContent, server.echo) assert.Equal(t, status, http.StatusOK) assert.Equal(t, resp, "") - status, resp = authRequest("POST", "default", "", "/?query="+escTitle, qContent, e) + status, resp = authRequest("POST", "default", "", "/?query="+escTitle, qContent, server.echo) assert.Equal(t, status, http.StatusOK) assert.Equal(t, resp, "") - status, resp = authRequest("POST", "default", "", "/", "", e) + status, resp = authRequest("POST", "default", "", "/", "", server.echo) assert.Equal(t, status, http.StatusOK) assert.Equal(t, resp, "") - e.GET("/status", server.statusHandler) - status, _ = request("GET", "/status", "", e) + server.echo.GET("/status", server.statusHandler) + status, _ = request("GET", "/status", "", server.echo) assert.Equal(t, status, http.StatusOK) + + server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler())) + status, _ = request("GET", "/metrics", "", server.echo) + assert.Equal(t, status, http.StatusOK) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + server.Shutdown(ctx) +} + +func TestServer_SafeQuit(t *testing.T) { + sender := &fakeSender{} + collect := NewCollector(sender, 1000, 1000) + collect.AddTable("test") + collect.Push("sss", "sss") + + assert.False(t, collect.Empty()) + + SafeQuit(collect, sender) + + assert.True(t, collect.Empty()) + assert.True(t, sender.Empty()) +} + +func TestServer_MultiServer(t *testing.T) { + + received := make([]string, 0) + var mu sync.Mutex + + s1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "") + req, _ := ioutil.ReadAll(r.Body) + mu.Lock() + defer mu.Unlock() + received = append(received, string(req)) + })) + defer s1.Close() + + s2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "") + req, _ := ioutil.ReadAll(r.Body) + mu.Lock() + defer mu.Unlock() + received = append(received, string(req)) + })) + defer s2.Close() + + sender := NewClickhouse(10, 10) + sender.AddServer(s1.URL) + sender.AddServer(s2.URL) + collect := NewCollector(sender, 1000, 1000) + collect.AddTable("test") + collect.Push("eee", "eee") + collect.Push("fff", "fff") + collect.Push("ggg", "ggg") + + assert.False(t, collect.Empty()) + + SafeQuit(collect, sender) + time.Sleep(100) // wait for http servers process requests + + assert.Equal(t, 3, len(received)) + assert.True(t, collect.Empty()) + assert.True(t, sender.Empty()) + + cnf, err := ReadConfig("wrong_config.json") + assert.Nil(t, err) + go RunServer(cnf) + time.Sleep(50) } func request(method, path string, body string, e *echo.Echo) (int, string) { diff --git a/utils.go b/utils.go index 818dca3..5dc5ac9 100644 --- a/utils.go +++ b/utils.go @@ -2,10 +2,31 @@ package main import ( "encoding/json" + "log" "os" + "strconv" "strings" ) +const sampleConfig = "config.sample.json" + +type clickhouseConfig struct { + Servers []string `json:"servers"` + DownTimeout int `json:"down_timeout"` + ConnectTimeout int `json:"connect_timeout"` +} + +// Config stores config data +type Config struct { + Listen string `json:"listen"` + Clickhouse clickhouseConfig `json:"clickhouse"` + FlushCount int `json:"flush_count"` + FlushInterval int `json:"flush_interval"` + DumpCheckInterval int `json:"dump_check_interval"` + DumpDir string `json:"dump_dir"` + Debug bool `json:"debug"` +} + // ReadJSON - read json file to struct func ReadJSON(fn string, v interface{}) error { file, err := os.Open(fn) @@ -21,3 +42,40 @@ func ReadJSON(fn string, v interface{}) error { func HasPrefix(s, prefix string) bool { return len(s) >= len(prefix) && strings.ToLower(s[0:len(prefix)]) == strings.ToLower(prefix) } + +// ReadConfig init config data +func ReadConfig(configFile string) (Config, error) { + cnf := Config{} + err := ReadJSON(configFile, &cnf) + if err != nil { + log.Printf("INFO: Config file %+v not found. Used%+v\n", configFile, sampleConfig) + err = ReadJSON(sampleConfig, &cnf) + if err != nil { + log.Printf("ERROR: read %+v failed\n", sampleConfig) + } + } + + flushCount := os.Getenv("CLICKHOUSE_FLUSH_COUNT") + if flushCount != "" { + cnf.FlushCount, err = strconv.Atoi(flushCount) + if err != nil { + log.Fatalf("Wrong flush count env: %+v\n", err.Error()) + } + } + + flushInterval := os.Getenv("CLICKHOUSE_FLUSH_INTERVAL") + if flushInterval != "" { + cnf.FlushInterval, err = strconv.Atoi(flushInterval) + if err != nil { + log.Fatalf("Wrong flush interval env: %+v\n", err.Error()) + } + } + + serversList := os.Getenv("CLICKHOUSE_SERVERS") + if serversList != "" { + cnf.Clickhouse.Servers = strings.Split(serversList, ",") + log.Printf("use servers: %+v\n", serversList) + } + + return cnf, err +} From c10fd9679ae8b1acff75bff0e468e64a598098d7 Mon Sep 17 00:00:00 2001 From: PaNick Date: Tue, 15 Oct 2019 23:37:10 +0300 Subject: [PATCH 10/29] improve coverage --- server_test.go | 4 ++++ utils.go | 31 ++++++++++++++++--------------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/server_test.go b/server_test.go index cf79ca1..b70016c 100644 --- a/server_test.go +++ b/server_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "os" "strings" "sync" "testing" @@ -110,8 +111,11 @@ func TestServer_MultiServer(t *testing.T) { assert.True(t, collect.Empty()) assert.True(t, sender.Empty()) + os.Setenv("DUMP_CHECK_INTERVAL", "10") cnf, err := ReadConfig("wrong_config.json") + os.Unsetenv("DUMP_CHECK_INTERVAL") assert.Nil(t, err) + assert.Equal(t, 10, cnf.DumpCheckInterval) go RunServer(cnf) time.Sleep(50) } diff --git a/utils.go b/utils.go index 5dc5ac9..4b3068a 100644 --- a/utils.go +++ b/utils.go @@ -43,6 +43,17 @@ func HasPrefix(s, prefix string) bool { return len(s) >= len(prefix) && strings.ToLower(s[0:len(prefix)]) == strings.ToLower(prefix) } +func readEnvInt(name string, value *int) { + s := os.Getenv(name) + if s != "" { + v, err := strconv.Atoi(s) + if err != nil { + log.Printf("ERROR: Wrong %+v env: %+v\n", name, err) + } + *value = v + } +} + // ReadConfig init config data func ReadConfig(configFile string) (Config, error) { cnf := Config{} @@ -55,21 +66,11 @@ func ReadConfig(configFile string) (Config, error) { } } - flushCount := os.Getenv("CLICKHOUSE_FLUSH_COUNT") - if flushCount != "" { - cnf.FlushCount, err = strconv.Atoi(flushCount) - if err != nil { - log.Fatalf("Wrong flush count env: %+v\n", err.Error()) - } - } - - flushInterval := os.Getenv("CLICKHOUSE_FLUSH_INTERVAL") - if flushInterval != "" { - cnf.FlushInterval, err = strconv.Atoi(flushInterval) - if err != nil { - log.Fatalf("Wrong flush interval env: %+v\n", err.Error()) - } - } + readEnvInt("CLICKHOUSE_FLUSH_COUNT", &cnf.FlushCount) + readEnvInt("CLICKHOUSE_FLUSH_INTERVAL", &cnf.FlushInterval) + readEnvInt("DUMP_CHECK_INTERVAL", &cnf.DumpCheckInterval) + readEnvInt("CLICKHOUSE_DOWN_TIMEOUT", &cnf.Clickhouse.DownTimeout) + readEnvInt("CLICKHOUSE_CONNECT_TIMEOUT", &cnf.Clickhouse.ConnectTimeout) serversList := os.Getenv("CLICKHOUSE_SERVERS") if serversList != "" { From 2bd9911bfe1d578c12d646dd1bbd5e3e044bfbcb Mon Sep 17 00:00:00 2001 From: PaNick Date: Wed, 16 Oct 2019 00:31:34 +0300 Subject: [PATCH 11/29] response status processing refactored --- .gitignore | 1 + clickhouse.go | 38 +++++++++++++++++++++++++------------- clickhouse_test.go | 9 ++++++--- dump.go | 6 +++--- sender.go | 6 +++--- server.go | 2 +- 6 files changed, 39 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 35ba4ef..1ff7c15 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ _testmain.go *.test *.prof dumptest +dumps .vscode .idea/ diff --git a/clickhouse.go b/clickhouse.go index 15e0016..6111c7f 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -1,6 +1,8 @@ package main import ( + "errors" + "fmt" "io/ioutil" "log" "net/http" @@ -36,6 +38,12 @@ type ClickhouseRequest struct { Content string } +// ErrServerIsDown - signals about server is down +var ErrServerIsDown = errors.New("server is down") + +// ErrNoServers - signals about no working servers +var ErrNoServers = errors.New("No working clickhouse servers") + // NewClickhouse - get clickhouse object func NewClickhouse(downTimeout int, connectTimeout int) (c *Clickhouse) { c = new(Clickhouse) @@ -140,9 +148,9 @@ func (c *Clickhouse) Run() { datas, err = c.Queue.Poll(1, time.Second*5) if err == nil { data := datas[0].(ClickhouseRequest) - resp, status := c.SendQuery(data.Params, data.Content) - if status != http.StatusOK { - log.Printf("ERROR: Send %+v: %+v\n", status, resp) + resp, status, err := c.SendQuery(data.Params, data.Content) + if err != nil { + log.Printf("ERROR: Send (%+v) %+v; response %+v\n", status, err, resp) c.Dump(data.Params, data.Content) } else { sentCounter.Inc() @@ -160,35 +168,39 @@ func (c *Clickhouse) WaitFlush() (err error) { } // SendQuery - sends query to server and return result -func (srv *ClickhouseServer) SendQuery(queryString string, data string) (response string, status int) { +func (srv *ClickhouseServer) SendQuery(queryString string, data string) (response string, status int, err error) { if srv.URL != "" { - log.Printf("INFO: send %+v rows to %+v of %+v\n", strings.Count(data, "\n")+1, srv.URL, queryString) resp, err := srv.Client.Post(srv.URL+"?"+queryString, "", strings.NewReader(data)) if err != nil { srv.Bad = true - return err.Error(), http.StatusBadGateway + return err.Error(), http.StatusBadGateway, ErrServerIsDown } buf, _ := ioutil.ReadAll(resp.Body) s := string(buf) - return s, resp.StatusCode + if resp.StatusCode >= 500 { + return s, resp.StatusCode, ErrServerIsDown + } else if resp.StatusCode >= 400 { + err = fmt.Errorf("wrong server status %+v", resp.StatusCode) + } + return s, resp.StatusCode, err } - return "", http.StatusOK + return "", http.StatusOK, err } // SendQuery - sends query to server and return result (with server cycle) -func (c *Clickhouse) SendQuery(queryString string, data string) (response string, status int) { +func (c *Clickhouse) SendQuery(queryString string, data string) (response string, status int, err error) { for { s := c.GetNextServer() if s != nil { - r, status := s.SendQuery(queryString, data) - if status == http.StatusBadGateway { + response, status, err = s.SendQuery(queryString, data) + if errors.Is(err, ErrServerIsDown) { continue } - return r, status + return response, status, err } - return "No working clickhouse servers", http.StatusBadGateway + return response, status, ErrNoServers } } diff --git a/clickhouse_test.go b/clickhouse_test.go index 544f8bd..bde7921 100644 --- a/clickhouse_test.go +++ b/clickhouse_test.go @@ -1,6 +1,7 @@ package main import ( + "errors" "net/http" "testing" "time" @@ -19,9 +20,10 @@ func TestClickhouse_GetNextServer(t *testing.T) { s.SendQuery("", "") s = c.GetNextServer() assert.Equal(t, "http://127.0.0.1:8124", s.URL) - resp, status := s.SendQuery("", "") + resp, status, err := s.SendQuery("", "") assert.NotEqual(t, "", resp) assert.Equal(t, http.StatusBadGateway, status) + assert.True(t, errors.Is(err, ErrServerIsDown)) assert.Equal(t, true, s.Bad) c.SendQuery("", "") } @@ -40,8 +42,9 @@ func TestClickhouse_SendQuery(t *testing.T) { c.AddServer("") c.GetNextServer() c.Servers[0].Bad = true - _, status := c.SendQuery("", "") - assert.Equal(t, http.StatusBadGateway, status) + _, status, err := c.SendQuery("", "") + assert.Equal(t, 0, status) + assert.True(t, errors.Is(err, ErrNoServers)) } func TestClickhouse_SendQuery1(t *testing.T) { diff --git a/dump.go b/dump.go index 004aca4..066b867 100644 --- a/dump.go +++ b/dump.go @@ -135,9 +135,9 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { if err != nil { log.Printf("ERROR: dump read: %+v\n", err) } - _, status := sender.SendQuery(data, "") - if status > 299 { - return fmt.Errorf("server status %+v", status) + _, status, err := sender.SendQuery(data, "") + if err != nil { + return fmt.Errorf("server error (%+v) %+v", status, err) } log.Printf("INFO: dump sended: %+v\n", f) err = d.DeleteDump(f) diff --git a/sender.go b/sender.go index f8f3441..c40252c 100644 --- a/sender.go +++ b/sender.go @@ -9,7 +9,7 @@ import ( // Sender interface for send requests type Sender interface { Send(queryString string, data string) - SendQuery(queryString string, data string) (response string, status int) + SendQuery(queryString string, data string) (response string, status int, err error) Len() int64 Empty() bool WaitFlush() (err error) @@ -27,10 +27,10 @@ func (s *fakeSender) Send(queryString string, data string) { s.sendHistory = append(s.sendHistory, queryString+" "+data) } -func (s *fakeSender) SendQuery(queryString string, data string) (response string, status int) { +func (s *fakeSender) SendQuery(queryString string, data string) (response string, status int, err error) { s.sendQueryHistory = append(s.sendQueryHistory, queryString+" "+data) log.Printf("DEBUG: send query: %+v\n", s.sendQueryHistory) - return "", http.StatusOK + return "", http.StatusOK, nil } func (s *fakeSender) Len() int64 { diff --git a/server.go b/server.go index a5dcbee..a8d92f6 100644 --- a/server.go +++ b/server.go @@ -57,7 +57,7 @@ func (server *Server) writeHandler(c echo.Context) error { go server.Collector.Push(params, content) return c.String(http.StatusOK, "") } - resp, status := server.Collector.Sender.SendQuery(params, content) + resp, status, _ := server.Collector.Sender.SendQuery(params, content) return c.String(status, resp) } From ae80965752bd69c633856a96770d916818708113 Mon Sep 17 00:00:00 2001 From: PaNick Date: Wed, 16 Oct 2019 00:40:41 +0300 Subject: [PATCH 12/29] small fixes --- clickhouse.go | 2 +- server_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index 6111c7f..46e97df 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -180,7 +180,7 @@ func (srv *ClickhouseServer) SendQuery(queryString string, data string) (respons buf, _ := ioutil.ReadAll(resp.Body) s := string(buf) if resp.StatusCode >= 500 { - return s, resp.StatusCode, ErrServerIsDown + err = ErrServerIsDown } else if resp.StatusCode >= 400 { err = fmt.Errorf("wrong server status %+v", resp.StatusCode) } diff --git a/server_test.go b/server_test.go index b70016c..120ba53 100644 --- a/server_test.go +++ b/server_test.go @@ -117,7 +117,7 @@ func TestServer_MultiServer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 10, cnf.DumpCheckInterval) go RunServer(cnf) - time.Sleep(50) + time.Sleep(1000) } func request(method, path string, body string, e *echo.Echo) (int, string) { From 68692cf0a909f47ace92d28532aa9fb1081dd6d2 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Wed, 16 Oct 2019 11:24:23 +0300 Subject: [PATCH 13/29] some dumper fixes --- dump.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dump.go b/dump.go index 066b867..2004c84 100644 --- a/dump.go +++ b/dump.go @@ -126,14 +126,14 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { return err } if err != nil { - log.Printf("ERROR: dump search: %+v\n", err) + return fmt.Errorf("Dump search error: %+v", err) } if f == "" { return nil } data, err := d.GetDumpData(f) if err != nil { - log.Printf("ERROR: dump read: %+v\n", err) + return fmt.Errorf("Dump read error: %+v", err) } _, status, err := sender.SendQuery(data, "") if err != nil { @@ -143,7 +143,7 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { err = d.DeleteDump(f) if err != nil { d.LockedFiles[f] = true - log.Printf("ERROR: dump delete: %+v\n", err) + return fmt.Errorf("Dump delete error: %+v", err) } return err } @@ -161,7 +161,7 @@ func (d *FileDumper) Listen(sender Sender, interval int) { err := d.ProcessNextDump(sender) if err != nil { if !errors.Is(err, ErrNoDumps) { - log.Printf("WARNING: %+v\n", err) + log.Printf("ERROR: %+v\n", err) } break } From e8a6b3fd10f75186e08d3cb79b61b79b9764e074 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Wed, 16 Oct 2019 12:06:40 +0300 Subject: [PATCH 14/29] fix metrics --- collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector.go b/collector.go index 0dc2782..fce08e0 100644 --- a/collector.go +++ b/collector.go @@ -161,6 +161,7 @@ func (c *Collector) Push(params string, content string) { if ok { table.Add(content) c.mu.RUnlock() + pushCounter.Inc() return } c.mu.RUnlock() From 6a8a4072fc550d14a8d913a13eeda25c0f3628d0 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Wed, 16 Oct 2019 12:19:17 +0300 Subject: [PATCH 15/29] small dumper fix --- dump.go | 13 +++++++++---- dump_test.go | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/dump.go b/dump.go index 2004c84..a574b8f 100644 --- a/dump.go +++ b/dump.go @@ -36,10 +36,12 @@ func (d *FileDumper) makePath(id string) string { return path.Join(d.Path, id) } -func (d *FileDumper) checkDir() error { +func (d *FileDumper) checkDir(create bool) error { _, err := os.Stat(d.Path) if os.IsNotExist(err) { - return os.Mkdir(d.Path, 0777) + if create { + return os.Mkdir(d.Path, 0777) + } } return err } @@ -60,7 +62,7 @@ func NewDumper(path string) *FileDumper { func (d *FileDumper) Dump(params string, data string) error { d.mu.Lock() defer d.mu.Unlock() - err := d.checkDir() + err := d.checkDir(true) if err != nil { return err } @@ -76,7 +78,10 @@ func (d *FileDumper) Dump(params string, data string) error { // GetDump - get dump file from filesystem func (d *FileDumper) GetDump() (string, error) { - err := d.checkDir() + err := d.checkDir(false) + if os.IsNotExist(err) { + return "", ErrNoDumps + } if err != nil { return "", err } diff --git a/dump_test.go b/dump_test.go index 63c454c..942760b 100644 --- a/dump_test.go +++ b/dump_test.go @@ -35,5 +35,6 @@ func TestDump_Dump(t *testing.T) { err = dumper.ProcessNextDump(sender) assert.True(t, errors.Is(err, ErrNoDumps)) - os.Remove(dumpDir) + err = os.Remove(dumpDir) + assert.Nil(t, err) } From 22fa4e2793438c9976ce9163fe4dd8c9b2af2ddd Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Thu, 17 Oct 2019 19:53:16 +0300 Subject: [PATCH 16/29] possibility to disable dump sender --- README.md | 2 +- server.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1868e30..518418e 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6') "listen": ":8124", "flush_count": 10000, // check by \n char "flush_interval": 1000, // milliseconds - "dump_check_interval": 30, // interval for try to send dumps (seconds) + "dump_check_interval": 30, // interval for try to send dumps (seconds); -1 to disable "debug": false, // log incoming requests "dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors) "clickhouse": { diff --git a/server.go b/server.go index a8d92f6..b53cac1 100644 --- a/server.go +++ b/server.go @@ -129,7 +129,9 @@ func RunServer(cnf Config) { } }() - dumper.Listen(sender, cnf.DumpCheckInterval) + if cnf.DumpCheckInterval >= 0 { + dumper.Listen(sender, cnf.DumpCheckInterval) + } err := srv.Start() if err != nil { From ddcb811975ccda61e26915b3a8a0692fcaddec68 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Thu, 17 Oct 2019 19:57:32 +0300 Subject: [PATCH 17/29] update go version in dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e4e0b2a..f39b4b6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.12.4 as builder +FROM golang:1.13.1 as builder ARG GOPROXY ENV GOOS=linux \ From 71e9660b2c13ef916cbf123071ae376056f319be Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Fri, 18 Oct 2019 00:35:19 +0300 Subject: [PATCH 18/29] dump sender fixes and some collector update --- clickhouse.go | 11 ++++++++--- collector.go | 32 ++++++++++++++++++++++++++++---- collector_test.go | 7 +++++++ config.sample.json | 2 +- dump.go | 12 +++++++++++- utils.go | 2 +- 6 files changed, 56 insertions(+), 10 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index 46e97df..88f6135 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -170,9 +170,13 @@ func (c *Clickhouse) WaitFlush() (err error) { // SendQuery - sends query to server and return result func (srv *ClickhouseServer) SendQuery(queryString string, data string) (response string, status int, err error) { if srv.URL != "" { - log.Printf("INFO: send %+v rows to %+v of %+v\n", strings.Count(data, "\n")+1, srv.URL, queryString) - - resp, err := srv.Client.Post(srv.URL+"?"+queryString, "", strings.NewReader(data)) + lines := strings.Split(data, "\n") + log.Printf("INFO: send %+v rows to %+v of %+v\n", len(lines)-2, srv.URL, lines[0]) + url := srv.URL + if queryString != "" { + url += "?" + queryString + } + resp, err := srv.Client.Post(url, "", strings.NewReader(data)) if err != nil { srv.Bad = true return err.Error(), http.StatusBadGateway, ErrServerIsDown @@ -197,6 +201,7 @@ func (c *Clickhouse) SendQuery(queryString string, data string) (response string if s != nil { response, status, err = s.SendQuery(queryString, data) if errors.Is(err, ErrServerIsDown) { + log.Printf("ERROR: server down (%+v): %+v\n", status, response) continue } return response, status, err diff --git a/collector.go b/collector.go index fce08e0..9d4841a 100644 --- a/collector.go +++ b/collector.go @@ -14,12 +14,15 @@ var regexValues = regexp.MustCompile("(?i)\\svalues\\s") // Table - store query table info type Table struct { Name string + Query string + Params string Rows []string count int FlushCount int FlushInterval int mu sync.Mutex Sender Sender + // todo add Last Error } // Collector - query collector @@ -58,8 +61,8 @@ func (t *Table) Content() string { // Flush - sends collected data in table to clickhouse func (t *Table) Flush() { - rows := t.Content() - t.Sender.Send(t.Name, rows) + data := t.Query + "\n" + t.Content() + "\n" + t.Sender.Send(t.Params, data) t.Rows = make([]string, 0, t.FlushCount) t.count = 0 } @@ -99,10 +102,9 @@ func (t *Table) RunTimer() { // Add - Adding query to table func (t *Table) Add(text string) { - count := strings.Count(text, "\n") + 1 t.mu.Lock() defer t.mu.Unlock() - t.count += count + t.count++ t.Rows = append(t.Rows, text) if len(t.Rows) >= t.FlushCount { t.Flush() @@ -147,8 +149,30 @@ func (c *Collector) AddTable(name string) { c.addTable(name) } +func (c *Collector) separateQuery(name string) (query string, params string) { + items := strings.Split(name, "&") + for _, p := range items { + if HasPrefix(p, "query=") { + query = p[6:] + } else { + params += "&" + p + } + } + if len(params) > 0 { + params = strings.TrimSpace(params[1:]) + } + q, err := url.QueryUnescape(query) + if err != nil { + return "", name + } + return q, params +} + func (c *Collector) addTable(name string) *Table { t := NewTable(name, c.Sender, c.Count, c.FlushInterval) + query, params := c.separateQuery(name) + t.Query = query + t.Params = params c.Tables[name] = t t.RunTimer() return t diff --git a/collector_test.go b/collector_test.go index ae5cd8c..e7c4b9a 100644 --- a/collector_test.go +++ b/collector_test.go @@ -122,6 +122,13 @@ func TestCollector_ParseQuery(t *testing.T) { assert.False(t, insert) } +func TestCollector_separateQuery(t *testing.T) { + c := NewCollector(&fakeSender{}, 1000, 1000) + query, params := c.separateQuery(escParamsAndSelect) + assert.Equal(t, qSelect, query) + assert.Equal(t, qParams, params) +} + func TestTable_CheckFlush(t *testing.T) { c := NewCollector(&fakeSender{}, 1000, 1000) c.Push(qTitle, qContent) diff --git a/config.sample.json b/config.sample.json index 93ede58..e8067f6 100644 --- a/config.sample.json +++ b/config.sample.json @@ -6,7 +6,7 @@ "debug": false, "dump_dir": "dumps", "clickhouse": { - "down_timeout": 300, + "down_timeout": 60, "connect_timeout": 10, "servers": [ "http://127.0.0.1:8123" diff --git a/dump.go b/dump.go index a574b8f..aa7f2e1 100644 --- a/dump.go +++ b/dump.go @@ -9,6 +9,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "sync" "time" ) @@ -140,7 +141,16 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { if err != nil { return fmt.Errorf("Dump read error: %+v", err) } - _, status, err := sender.SendQuery(data, "") + if data == "" { + return nil + } + params := "" + lines := strings.Split(data, "\n") + if !HasPrefix(lines[0], "insert") { + params = lines[0] + data = strings.Join(lines[1:], "\n") + } + _, status, err := sender.SendQuery(params, data) if err != nil { return fmt.Errorf("server error (%+v) %+v", status, err) } diff --git a/utils.go b/utils.go index 4b3068a..bfc0028 100644 --- a/utils.go +++ b/utils.go @@ -75,8 +75,8 @@ func ReadConfig(configFile string) (Config, error) { serversList := os.Getenv("CLICKHOUSE_SERVERS") if serversList != "" { cnf.Clickhouse.Servers = strings.Split(serversList, ",") - log.Printf("use servers: %+v\n", serversList) } + log.Printf("use servers: %+v\n", strings.Join(cnf.Clickhouse.Servers, ", ")) return cnf, err } From d4b79ecc71c9a887f57c0084c52f2cd048b6bd84 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Fri, 18 Oct 2019 00:37:22 +0300 Subject: [PATCH 19/29] fix test --- collector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector_test.go b/collector_test.go index e7c4b9a..3906fd9 100644 --- a/collector_test.go +++ b/collector_test.go @@ -35,7 +35,7 @@ func TestCollector_Push(t *testing.T) { for i := 0; i < 10400; i++ { c.Push(escTitle, qContent) } - assert.Equal(t, 800, c.Tables[escTitle].GetCount()) + assert.Equal(t, 400, c.Tables[escTitle].GetCount()) } func BenchmarkCollector_ParseQuery(b *testing.B) { From ed9a2150546bfe85091d4541ad472c8c94b463c9 Mon Sep 17 00:00:00 2001 From: PaNick Date: Sun, 20 Oct 2019 00:34:37 +0300 Subject: [PATCH 20/29] fix tabseparated + some refactoring --- clickhouse.go | 28 ++++++++++++++-------------- clickhouse_test.go | 10 +++++----- collector.go | 28 ++++++++++++++++++++++++---- collector_test.go | 6 ++++++ dump.go | 2 +- sender.go | 12 ++++++------ server.go | 2 +- 7 files changed, 57 insertions(+), 31 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index 88f6135..798dc86 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -35,7 +35,9 @@ type Clickhouse struct { // ClickhouseRequest - request struct for queue type ClickhouseRequest struct { Params string + Query string Content string + Count int } // ErrServerIsDown - signals about server is down @@ -113,10 +115,9 @@ func (c *Clickhouse) GetNextServer() (srv *ClickhouseServer) { } // Send - send request to next server -func (c *Clickhouse) Send(queryString string, data string) { - req := ClickhouseRequest{queryString, data} +func (c *Clickhouse) Send(r *ClickhouseRequest) { c.wg.Add(1) - c.Queue.Put(req) + c.Queue.Put(r) } // Dump - save query to file @@ -147,8 +148,8 @@ func (c *Clickhouse) Run() { for { datas, err = c.Queue.Poll(1, time.Second*5) if err == nil { - data := datas[0].(ClickhouseRequest) - resp, status, err := c.SendQuery(data.Params, data.Content) + data := datas[0].(*ClickhouseRequest) + resp, status, err := c.SendQuery(data) if err != nil { log.Printf("ERROR: Send (%+v) %+v; response %+v\n", status, err, resp) c.Dump(data.Params, data.Content) @@ -168,15 +169,14 @@ func (c *Clickhouse) WaitFlush() (err error) { } // SendQuery - sends query to server and return result -func (srv *ClickhouseServer) SendQuery(queryString string, data string) (response string, status int, err error) { +func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, status int, err error) { if srv.URL != "" { - lines := strings.Split(data, "\n") - log.Printf("INFO: send %+v rows to %+v of %+v\n", len(lines)-2, srv.URL, lines[0]) url := srv.URL - if queryString != "" { - url += "?" + queryString + if r.Params != "" { + url += "?" + r.Params } - resp, err := srv.Client.Post(url, "", strings.NewReader(data)) + log.Printf("INFO: send %+v rows to %+v of %+v\n", r.Count, url, r.Query) + resp, err := srv.Client.Post(url, "", strings.NewReader(r.Content)) if err != nil { srv.Bad = true return err.Error(), http.StatusBadGateway, ErrServerIsDown @@ -186,7 +186,7 @@ func (srv *ClickhouseServer) SendQuery(queryString string, data string) (respons if resp.StatusCode >= 500 { err = ErrServerIsDown } else if resp.StatusCode >= 400 { - err = fmt.Errorf("wrong server status %+v", resp.StatusCode) + err = fmt.Errorf("Wrong server status %+v:\nresponse: %+v\nrequest: %#v", resp.StatusCode, s, r.Content) } return s, resp.StatusCode, err } @@ -195,11 +195,11 @@ func (srv *ClickhouseServer) SendQuery(queryString string, data string) (respons } // SendQuery - sends query to server and return result (with server cycle) -func (c *Clickhouse) SendQuery(queryString string, data string) (response string, status int, err error) { +func (c *Clickhouse) SendQuery(r *ClickhouseRequest) (response string, status int, err error) { for { s := c.GetNextServer() if s != nil { - response, status, err = s.SendQuery(queryString, data) + response, status, err = s.SendQuery(r) if errors.Is(err, ErrServerIsDown) { log.Printf("ERROR: server down (%+v): %+v\n", status, response) continue diff --git a/clickhouse_test.go b/clickhouse_test.go index bde7921..6239743 100644 --- a/clickhouse_test.go +++ b/clickhouse_test.go @@ -17,21 +17,21 @@ func TestClickhouse_GetNextServer(t *testing.T) { c.AddServer("http://127.0.0.1:8123") s := c.GetNextServer() assert.Equal(t, "", s.URL) - s.SendQuery("", "") + s.SendQuery(&ClickhouseRequest{}) s = c.GetNextServer() assert.Equal(t, "http://127.0.0.1:8124", s.URL) - resp, status, err := s.SendQuery("", "") + resp, status, err := s.SendQuery(&ClickhouseRequest{}) assert.NotEqual(t, "", resp) assert.Equal(t, http.StatusBadGateway, status) assert.True(t, errors.Is(err, ErrServerIsDown)) assert.Equal(t, true, s.Bad) - c.SendQuery("", "") + c.SendQuery(&ClickhouseRequest{}) } func TestClickhouse_Send(t *testing.T) { c := NewClickhouse(300, 10) c.AddServer("") - c.Send("", "") + c.Send(&ClickhouseRequest{}) for !c.Queue.Empty() { time.Sleep(10) } @@ -42,7 +42,7 @@ func TestClickhouse_SendQuery(t *testing.T) { c.AddServer("") c.GetNextServer() c.Servers[0].Bad = true - _, status, err := c.SendQuery("", "") + _, status, err := c.SendQuery(&ClickhouseRequest{}) assert.Equal(t, 0, status) assert.True(t, errors.Is(err, ErrNoServers)) } diff --git a/collector.go b/collector.go index 9d4841a..3f48ec2 100644 --- a/collector.go +++ b/collector.go @@ -8,12 +8,17 @@ import ( "time" ) +const formatValues = "values" +const formatTabSeparated = "tabseparated" + var regexFormat = regexp.MustCompile("(?i)format\\s\\S+(\\s+)") var regexValues = regexp.MustCompile("(?i)\\svalues\\s") +var regexGetFormat = regexp.MustCompile("(?i)format\\s(\\S+)") // Table - store query table info type Table struct { Name string + Format string Query string Params string Rows []string @@ -56,13 +61,18 @@ func NewCollector(sender Sender, count int, interval int) (c *Collector) { // Content - get text content of rowsfor query func (t *Table) Content() string { - return strings.Join(t.Rows, "\n") + return t.Query + "\n" + strings.Join(t.Rows, "\n") } // Flush - sends collected data in table to clickhouse func (t *Table) Flush() { - data := t.Query + "\n" + t.Content() + "\n" - t.Sender.Send(t.Params, data) + req := ClickhouseRequest{ + Params: t.Params, + Query: t.Query, + Content: t.Content(), + Count: t.count, + } + t.Sender.Send(&req) t.Rows = make([]string, 0, t.FlushCount) t.count = 0 } @@ -168,11 +178,21 @@ func (c *Collector) separateQuery(name string) (query string, params string) { return q, params } +func (c *Collector) getFormat(query string) (format string) { + format = formatValues + f := regexGetFormat.FindSubmatch([]byte(query)) + if len(f) > 1 { + format = strings.TrimSpace(string(f[1])) + } + return format +} + func (c *Collector) addTable(name string) *Table { t := NewTable(name, c.Sender, c.Count, c.FlushInterval) query, params := c.separateQuery(name) t.Query = query t.Params = params + t.Format = c.getFormat(query) c.Tables[name] = t t.RunTimer() return t @@ -242,7 +262,7 @@ func (c *Collector) ParseQuery(queryString string, body string) (params string, params = "query=" + url.QueryEscape(q) } } - return params, content, insert + return strings.TrimSpace(params), strings.TrimSpace(content), insert } // Parse - parsing text for query and data diff --git a/collector_test.go b/collector_test.go index 3906fd9..19f7864 100644 --- a/collector_test.go +++ b/collector_test.go @@ -129,6 +129,12 @@ func TestCollector_separateQuery(t *testing.T) { assert.Equal(t, qParams, params) } +func TestTable_getFormat(t *testing.T) { + c := NewCollector(&fakeSender{}, 1000, 1000) + f := c.getFormat(qTitle) + assert.Equal(t, "TabSeparated", f) +} + func TestTable_CheckFlush(t *testing.T) { c := NewCollector(&fakeSender{}, 1000, 1000) c.Push(qTitle, qContent) diff --git a/dump.go b/dump.go index aa7f2e1..02859fb 100644 --- a/dump.go +++ b/dump.go @@ -150,7 +150,7 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { params = lines[0] data = strings.Join(lines[1:], "\n") } - _, status, err := sender.SendQuery(params, data) + _, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data}) if err != nil { return fmt.Errorf("server error (%+v) %+v", status, err) } diff --git a/sender.go b/sender.go index c40252c..8cb082f 100644 --- a/sender.go +++ b/sender.go @@ -8,8 +8,8 @@ import ( // Sender interface for send requests type Sender interface { - Send(queryString string, data string) - SendQuery(queryString string, data string) (response string, status int, err error) + Send(r *ClickhouseRequest) + SendQuery(r *ClickhouseRequest) (response string, status int, err error) Len() int64 Empty() bool WaitFlush() (err error) @@ -21,14 +21,14 @@ type fakeSender struct { mu sync.Mutex } -func (s *fakeSender) Send(queryString string, data string) { +func (s *fakeSender) Send(r *ClickhouseRequest) { s.mu.Lock() defer s.mu.Unlock() - s.sendHistory = append(s.sendHistory, queryString+" "+data) + s.sendHistory = append(s.sendHistory, r.Params+" "+r.Content) } -func (s *fakeSender) SendQuery(queryString string, data string) (response string, status int, err error) { - s.sendQueryHistory = append(s.sendQueryHistory, queryString+" "+data) +func (s *fakeSender) SendQuery(r *ClickhouseRequest) (response string, status int, err error) { + s.sendQueryHistory = append(s.sendQueryHistory, r.Params+r.Content) log.Printf("DEBUG: send query: %+v\n", s.sendQueryHistory) return "", http.StatusOK, nil } diff --git a/server.go b/server.go index b53cac1..654e459 100644 --- a/server.go +++ b/server.go @@ -57,7 +57,7 @@ func (server *Server) writeHandler(c echo.Context) error { go server.Collector.Push(params, content) return c.String(http.StatusOK, "") } - resp, status, _ := server.Collector.Sender.SendQuery(params, content) + resp, status, _ := server.Collector.Sender.SendQuery(&ClickhouseRequest{Params: params, Content: content}) return c.String(status, resp) } From c18876f4b413bca906fbd6cdeb21006cee1785b4 Mon Sep 17 00:00:00 2001 From: PaNick Date: Sun, 20 Oct 2019 19:27:57 +0300 Subject: [PATCH 21/29] simple dump resend priority --- clickhouse.go | 10 +++++++--- dump.go | 12 +++++++----- dump_test.go | 6 +++--- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index 798dc86..f5e3eb8 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -121,12 +121,12 @@ func (c *Clickhouse) Send(r *ClickhouseRequest) { } // Dump - save query to file -func (c *Clickhouse) Dump(params string, data string) error { +func (c *Clickhouse) Dump(params string, data string, prefix string) error { dumpCounter.Inc() if c.Dumper != nil { c.mu.Lock() defer c.mu.Unlock() - return c.Dumper.Dump(params, data) + return c.Dumper.Dump(params, data, prefix) } return nil } @@ -152,7 +152,11 @@ func (c *Clickhouse) Run() { resp, status, err := c.SendQuery(data) if err != nil { log.Printf("ERROR: Send (%+v) %+v; response %+v\n", status, err, resp) - c.Dump(data.Params, data.Content) + prefix := "1" + if status >= 400 && status < 500 { + prefix = "2" + } + c.Dump(data.Params, data.Content, prefix) } else { sentCounter.Inc() } diff --git a/dump.go b/dump.go index 02859fb..c03f346 100644 --- a/dump.go +++ b/dump.go @@ -8,6 +8,7 @@ import ( "os" "path" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -21,7 +22,7 @@ var ErrNoDumps = errors.New("No dumps") // Dumper - interface for dump data type Dumper interface { - Dump(params string, data string) error + Dump(params string, data string, prefix string) error } // FileDumper - dumps data to file system @@ -47,8 +48,8 @@ func (d *FileDumper) checkDir(create bool) error { return err } -func (d *FileDumper) dumpName(num int) string { - return "dump" + d.DumpPrefix + "-" + strconv.Itoa(num) + ".dmp" +func (d *FileDumper) dumpName(num int, prefix string) string { + return "dump" + d.DumpPrefix + prefix + "-" + strconv.Itoa(num) + ".dmp" } // NewDumper - create new dumper @@ -60,7 +61,7 @@ func NewDumper(path string) *FileDumper { } // Dump - dumps data to files -func (d *FileDumper) Dump(params string, data string) error { +func (d *FileDumper) Dump(params string, data string, prefix string) error { d.mu.Lock() defer d.mu.Unlock() err := d.checkDir(true) @@ -69,7 +70,7 @@ func (d *FileDumper) Dump(params string, data string) error { } d.DumpNum++ err = ioutil.WriteFile( - path.Join(d.Path, d.dumpName(d.DumpNum)), []byte(params+"\n"+data), 0644, + path.Join(d.Path, d.dumpName(d.DumpNum, prefix)), []byte(params+"\n"+data), 0644, ) if err != nil { log.Printf("ERROR: dump to file: %+v\n", err) @@ -97,6 +98,7 @@ func (d *FileDumper) GetDump() (string, error) { dumpFiles = append(dumpFiles, f.Name()) } } + sort.Strings(dumpFiles) queuedDumps.Set(float64(len(dumpFiles))) diff --git a/dump_test.go b/dump_test.go index 942760b..fe174bd 100644 --- a/dump_test.go +++ b/dump_test.go @@ -15,9 +15,9 @@ func TestDump_Dump(t *testing.T) { dumper := NewDumper(dumpDir) c.Dumper = dumper c.AddServer("") - c.Dump("eee", "eee") + c.Dump("eee", "eee", "") assert.True(t, c.Empty()) - buf, err := dumper.GetDumpData(dumper.dumpName(1)) + buf, err := dumper.GetDumpData(dumper.dumpName(1, "")) assert.Nil(t, err) assert.Equal(t, "eee\neee", string(buf)) @@ -30,7 +30,7 @@ func TestDump_Dump(t *testing.T) { assert.Len(t, sender.sendQueryHistory, 1) dumper.Listen(sender, 1) - c.Dump("eee", "eee") + c.Dump("eee", "eee", "") time.Sleep(time.Second) err = dumper.ProcessNextDump(sender) assert.True(t, errors.Is(err, ErrNoDumps)) From 1809c8c9352fe57c4da077301c9dfacdbba25ea3 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Sun, 20 Oct 2019 19:54:05 +0300 Subject: [PATCH 22/29] fix dumps in 4xx response --- clickhouse.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index f5e3eb8..5ff8d0b 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -153,7 +153,7 @@ func (c *Clickhouse) Run() { if err != nil { log.Printf("ERROR: Send (%+v) %+v; response %+v\n", status, err, resp) prefix := "1" - if status >= 400 && status < 500 { + if status >= 400 && status < 502 { prefix = "2" } c.Dump(data.Params, data.Content, prefix) @@ -187,7 +187,8 @@ func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, s } buf, _ := ioutil.ReadAll(resp.Body) s := string(buf) - if resp.StatusCode >= 500 { + if resp.StatusCode >= 502 { + srv.Bad = true err = ErrServerIsDown } else if resp.StatusCode >= 400 { err = fmt.Errorf("Wrong server status %+v:\nresponse: %+v\nrequest: %#v", resp.StatusCode, s, r.Content) From ca717cb91af51235f29e53ceff2d7c8583232d13 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 11:14:12 +0300 Subject: [PATCH 23/29] add status to dump name --- .gitignore | 3 +++ clickhouse.go | 6 +++--- dump.go | 10 +++++----- dump_test.go | 6 +++--- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 1ff7c15..626bca1 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,6 @@ dumps .vscode .idea/ +clickhouse-bulk +debug +check.py \ No newline at end of file diff --git a/clickhouse.go b/clickhouse.go index 5ff8d0b..cf22795 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -121,12 +121,12 @@ func (c *Clickhouse) Send(r *ClickhouseRequest) { } // Dump - save query to file -func (c *Clickhouse) Dump(params string, data string, prefix string) error { +func (c *Clickhouse) Dump(params string, data string, prefix string, status int) error { dumpCounter.Inc() if c.Dumper != nil { c.mu.Lock() defer c.mu.Unlock() - return c.Dumper.Dump(params, data, prefix) + return c.Dumper.Dump(params, data, prefix, status) } return nil } @@ -156,7 +156,7 @@ func (c *Clickhouse) Run() { if status >= 400 && status < 502 { prefix = "2" } - c.Dump(data.Params, data.Content, prefix) + c.Dump(data.Params, data.Content, prefix, status) } else { sentCounter.Inc() } diff --git a/dump.go b/dump.go index c03f346..ee1c668 100644 --- a/dump.go +++ b/dump.go @@ -22,7 +22,7 @@ var ErrNoDumps = errors.New("No dumps") // Dumper - interface for dump data type Dumper interface { - Dump(params string, data string, prefix string) error + Dump(params string, data string, prefix string, status int) error } // FileDumper - dumps data to file system @@ -48,8 +48,8 @@ func (d *FileDumper) checkDir(create bool) error { return err } -func (d *FileDumper) dumpName(num int, prefix string) string { - return "dump" + d.DumpPrefix + prefix + "-" + strconv.Itoa(num) + ".dmp" +func (d *FileDumper) dumpName(num int, prefix string, status int) string { + return "dump" + d.DumpPrefix + prefix + "-" + strconv.Itoa(num) + "-" + strconv.Itoa(status) + ".dmp" } // NewDumper - create new dumper @@ -61,7 +61,7 @@ func NewDumper(path string) *FileDumper { } // Dump - dumps data to files -func (d *FileDumper) Dump(params string, data string, prefix string) error { +func (d *FileDumper) Dump(params string, data string, prefix string, status int) error { d.mu.Lock() defer d.mu.Unlock() err := d.checkDir(true) @@ -70,7 +70,7 @@ func (d *FileDumper) Dump(params string, data string, prefix string) error { } d.DumpNum++ err = ioutil.WriteFile( - path.Join(d.Path, d.dumpName(d.DumpNum, prefix)), []byte(params+"\n"+data), 0644, + path.Join(d.Path, d.dumpName(d.DumpNum, prefix, status)), []byte(params+"\n"+data), 0644, ) if err != nil { log.Printf("ERROR: dump to file: %+v\n", err) diff --git a/dump_test.go b/dump_test.go index fe174bd..1c24210 100644 --- a/dump_test.go +++ b/dump_test.go @@ -15,9 +15,9 @@ func TestDump_Dump(t *testing.T) { dumper := NewDumper(dumpDir) c.Dumper = dumper c.AddServer("") - c.Dump("eee", "eee", "") + c.Dump("eee", "eee", "", 502) assert.True(t, c.Empty()) - buf, err := dumper.GetDumpData(dumper.dumpName(1, "")) + buf, err := dumper.GetDumpData(dumper.dumpName(1, "", 502)) assert.Nil(t, err) assert.Equal(t, "eee\neee", string(buf)) @@ -30,7 +30,7 @@ func TestDump_Dump(t *testing.T) { assert.Len(t, sender.sendQueryHistory, 1) dumper.Listen(sender, 1) - c.Dump("eee", "eee", "") + c.Dump("eee", "eee", "", 502) time.Sleep(time.Second) err = dumper.ProcessNextDump(sender) assert.True(t, errors.Is(err, ErrNoDumps)) From 4dc82cee494dacbae7b0b0876479ec0cc74b9366 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 12:32:29 +0300 Subject: [PATCH 24/29] increase dump check interval --- config.sample.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.sample.json b/config.sample.json index e8067f6..518167e 100644 --- a/config.sample.json +++ b/config.sample.json @@ -2,7 +2,7 @@ "listen": ":8124", "flush_count": 10000, "flush_interval": 1000, - "dump_check_interval": 30, + "dump_check_interval": 300, "debug": false, "dump_dir": "dumps", "clickhouse": { From f3b5525b4b92c59efca47441b6ffb9acbe3b1584 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 12:33:32 +0300 Subject: [PATCH 25/29] update config sample --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 518418e..7e170d6 100644 --- a/README.md +++ b/README.md @@ -58,11 +58,11 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6') "listen": ":8124", "flush_count": 10000, // check by \n char "flush_interval": 1000, // milliseconds - "dump_check_interval": 30, // interval for try to send dumps (seconds); -1 to disable + "dump_check_interval": 300, // interval for try to send dumps (seconds); -1 to disable "debug": false, // log incoming requests "dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors) "clickhouse": { - "down_timeout": 300, // wait if server in down (seconds) + "down_timeout": 60, // wait if server in down (seconds) "connect_timeout": 10, // wait for server connect (seconds) "servers": [ "http://127.0.0.1:8123" From 22c5ddf1eb7f4d1f5f81eb3a2f0c7febfc6b7af0 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 12:52:44 +0300 Subject: [PATCH 26/29] fix metrics --- README.md | 10 ++++++++++ metrics.go | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/README.md b/README.md index 7e170d6..1e88840 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,16 @@ INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6') `./clickhouse-bulk` and send queries to :8124 +### Metrics +manual check main metrics +`curl -s http://127.0.0.1:8124/metrics | grep "^ch_"` +* `ch_bad_servers 0` - actual count of bad servers +* `ch_dump_count 0` - dumps saved from launch +* `ch_queued_dumps 0` - actual dump files id directory +* `ch_good_servers 1` - actual good servers count +* `ch_received_count 40` - received requests count from launch +* `ch_sent_count 1` - sent request count from launch + ### Tips diff --git a/metrics.go b/metrics.go index 7e2602b..2822bb7 100644 --- a/metrics.go +++ b/metrics.go @@ -7,27 +7,33 @@ import ( var pushCounter = prometheus.NewCounter( prometheus.CounterOpts{ Name: "ch_received_count", + Help: "Received requests count from launch", }) var sentCounter = prometheus.NewCounter( prometheus.CounterOpts{ Name: "ch_sent_count", + Help: "Sent request count from launch", }) var dumpCounter = prometheus.NewCounter( prometheus.CounterOpts{ Name: "ch_dump_count", + Help: "Dumps saved from launch", }) var goodServers = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "ch_good_servers", + Help: "Actual good servers count", }) var badServers = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "ch_bad_servers", + Help: "Actual count of bad servers", }) var queuedDumps = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "ch_queued_dumps", + Help: "Actual dump files id directory", }) // InitMetrics - init prometheus metrics @@ -36,6 +42,7 @@ func InitMetrics() { prometheus.MustRegister(pushCounter) prometheus.MustRegister(sentCounter) prometheus.MustRegister(dumpCounter) + prometheus.MustRegister(queuedDumps) prometheus.MustRegister(goodServers) prometheus.MustRegister(badServers) } From f33459df7f5d4a9a6785ac5e686cec7886391a7a Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 14:57:35 +0300 Subject: [PATCH 27/29] add server response to dumps --- clickhouse.go | 6 +++--- dump.go | 46 +++++++++++++++++++++++++++------------------- dump_test.go | 10 +++++----- 3 files changed, 35 insertions(+), 27 deletions(-) diff --git a/clickhouse.go b/clickhouse.go index cf22795..0689c3a 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -121,12 +121,12 @@ func (c *Clickhouse) Send(r *ClickhouseRequest) { } // Dump - save query to file -func (c *Clickhouse) Dump(params string, data string, prefix string, status int) error { +func (c *Clickhouse) Dump(params string, content string, response string, prefix string, status int) error { dumpCounter.Inc() if c.Dumper != nil { c.mu.Lock() defer c.mu.Unlock() - return c.Dumper.Dump(params, data, prefix, status) + return c.Dumper.Dump(params, content, response, prefix, status) } return nil } @@ -156,7 +156,7 @@ func (c *Clickhouse) Run() { if status >= 400 && status < 502 { prefix = "2" } - c.Dump(data.Params, data.Content, prefix, status) + c.Dump(data.Params, data.Content, resp, prefix, status) } else { sentCounter.Inc() } diff --git a/dump.go b/dump.go index ee1c668..4d9b6a2 100644 --- a/dump.go +++ b/dump.go @@ -16,13 +16,14 @@ import ( ) const defaultDumpCheckInterval = 30 +const dumpResponseMark = "\n### RESPONSE ###\n" // ErrNoDumps - signal that dumps not found var ErrNoDumps = errors.New("No dumps") // Dumper - interface for dump data type Dumper interface { - Dump(params string, data string, prefix string, status int) error + Dump(params string, data string, response string, prefix string, status int) error } // FileDumper - dumps data to file system @@ -61,16 +62,20 @@ func NewDumper(path string) *FileDumper { } // Dump - dumps data to files -func (d *FileDumper) Dump(params string, data string, prefix string, status int) error { +func (d *FileDumper) Dump(params string, content string, response string, prefix string, status int) error { d.mu.Lock() defer d.mu.Unlock() err := d.checkDir(true) if err != nil { return err } + data := params + "\n" + content + if response != "" { + data += dumpResponseMark + response + } d.DumpNum++ err = ioutil.WriteFile( - path.Join(d.Path, d.dumpName(d.DumpNum, prefix, status)), []byte(params+"\n"+data), 0644, + path.Join(d.Path, d.dumpName(d.DumpNum, prefix, status)), []byte(data), 0644, ) if err != nil { log.Printf("ERROR: dump to file: %+v\n", err) @@ -112,10 +117,14 @@ func (d *FileDumper) GetDump() (string, error) { } // GetDumpData - get dump data from filesystem -func (d *FileDumper) GetDumpData(id string) (string, error) { +func (d *FileDumper) GetDumpData(id string) (data string, response string, err error) { path := d.makePath(id) s, err := ioutil.ReadFile(path) - return string(s), err + items := strings.Split(string(s), dumpResponseMark) + if len(items) > 1 { + return items[0], items[1], err + } + return items[0], "", err } // DeleteDump - get dump data from filesystem @@ -139,24 +148,23 @@ func (d *FileDumper) ProcessNextDump(sender Sender) error { if f == "" { return nil } - data, err := d.GetDumpData(f) + data, _, err := d.GetDumpData(f) if err != nil { return fmt.Errorf("Dump read error: %+v", err) } - if data == "" { - return nil - } - params := "" - lines := strings.Split(data, "\n") - if !HasPrefix(lines[0], "insert") { - params = lines[0] - data = strings.Join(lines[1:], "\n") - } - _, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data}) - if err != nil { - return fmt.Errorf("server error (%+v) %+v", status, err) + if data != "" { + params := "" + lines := strings.Split(data, "\n") + if !HasPrefix(lines[0], "insert") { + params = lines[0] + data = strings.Join(lines[1:], "\n") + } + _, status, err := sender.SendQuery(&ClickhouseRequest{Params: params, Content: data}) + if err != nil { + return fmt.Errorf("server error (%+v) %+v", status, err) + } + log.Printf("INFO: dump sended: %+v\n", f) } - log.Printf("INFO: dump sended: %+v\n", f) err = d.DeleteDump(f) if err != nil { d.LockedFiles[f] = true diff --git a/dump_test.go b/dump_test.go index 1c24210..2ef3d30 100644 --- a/dump_test.go +++ b/dump_test.go @@ -15,11 +15,11 @@ func TestDump_Dump(t *testing.T) { dumper := NewDumper(dumpDir) c.Dumper = dumper c.AddServer("") - c.Dump("eee", "eee", "", 502) + c.Dump("eee", "eee", "error", "", 502) assert.True(t, c.Empty()) - buf, err := dumper.GetDumpData(dumper.dumpName(1, "", 502)) + buf, _, err := dumper.GetDumpData(dumper.dumpName(1, "", 502)) assert.Nil(t, err) - assert.Equal(t, "eee\neee", string(buf)) + assert.Equal(t, "eee\neee\n", string(buf)) sender := &fakeSender{} err = dumper.ProcessNextDump(sender) @@ -30,10 +30,10 @@ func TestDump_Dump(t *testing.T) { assert.Len(t, sender.sendQueryHistory, 1) dumper.Listen(sender, 1) - c.Dump("eee", "eee", "", 502) + c.Dump("eee", "eee", "", "", 502) time.Sleep(time.Second) err = dumper.ProcessNextDump(sender) - assert.True(t, errors.Is(err, ErrNoDumps)) + assert.Equal(t, ErrNoDumps, err) err = os.Remove(dumpDir) assert.Nil(t, err) From 56c555f94596e743b130b91b19ffceaa5850d765 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 16:04:24 +0300 Subject: [PATCH 28/29] fix test --- dump_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dump_test.go b/dump_test.go index 2ef3d30..09e9bb3 100644 --- a/dump_test.go +++ b/dump_test.go @@ -19,7 +19,7 @@ func TestDump_Dump(t *testing.T) { assert.True(t, c.Empty()) buf, _, err := dumper.GetDumpData(dumper.dumpName(1, "", 502)) assert.Nil(t, err) - assert.Equal(t, "eee\neee\n", string(buf)) + assert.Equal(t, "eee\neee", string(buf)) sender := &fakeSender{} err = dumper.ProcessNextDump(sender) From 5d9ffdd7ab18e84d67d22a0bab131c1eb4a60436 Mon Sep 17 00:00:00 2001 From: Nikolay Pavlovich Date: Mon, 21 Oct 2019 16:17:38 +0300 Subject: [PATCH 29/29] fix test --- dump_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dump_test.go b/dump_test.go index 09e9bb3..d98a2ab 100644 --- a/dump_test.go +++ b/dump_test.go @@ -31,7 +31,7 @@ func TestDump_Dump(t *testing.T) { dumper.Listen(sender, 1) c.Dump("eee", "eee", "", "", 502) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) err = dumper.ProcessNextDump(sender) assert.Equal(t, ErrNoDumps, err)