diff --git a/clickhouse.go b/clickhouse.go index 0625a0e..10c80e6 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -20,6 +20,7 @@ type ClickhouseServer struct { LastRequest time.Time Bad bool Client *http.Client + LogQueries bool } // Clickhouse - main clickhouse sender object @@ -75,12 +76,12 @@ func NewClickhouse(downTimeout int, connectTimeout int, tlsServerName string, tl } // AddServer - add clickhouse server url -func (c *Clickhouse) AddServer(url string) { +func (c *Clickhouse) AddServer(url string, logQueries bool) { c.mu.Lock() defer c.mu.Unlock() c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{ Timeout: time.Second * time.Duration(c.ConnectTimeout), Transport: c.Transport, - }}) + }, LogQueries: logQueries }) } // DumpServers - dump servers state to prometheus @@ -193,7 +194,7 @@ func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, s if r.Params != "" { url += "?" + r.Params } - if r.isInsert { + if r.isInsert && srv.LogQueries { log.Printf("INFO: sending %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query) } resp, err := srv.Client.Post(url, "text/plain", strings.NewReader(r.Content)) @@ -201,7 +202,7 @@ func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, s srv.Bad = true return err.Error(), http.StatusBadGateway, ErrServerIsDown } - if r.isInsert { + if r.isInsert && srv.LogQueries { log.Printf("INFO: sent %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query) } buf, _ := ioutil.ReadAll(resp.Body) diff --git a/clickhouse_test.go b/clickhouse_test.go index 4a21743..575296c 100644 --- a/clickhouse_test.go +++ b/clickhouse_test.go @@ -11,10 +11,10 @@ import ( func TestClickhouse_GetNextServer(t *testing.T) { c := NewClickhouse(300, 10, "", false) - c.AddServer("") - c.AddServer("http://127.0.0.1:8124") - c.AddServer("http://127.0.0.1:8125") - c.AddServer("http://127.0.0.1:8123") + c.AddServer("", true) + c.AddServer("http://127.0.0.1:8124", true) + c.AddServer("http://127.0.0.1:8125", true) + c.AddServer("http://127.0.0.1:8123", true) s := c.GetNextServer() assert.Equal(t, "", s.URL) s.SendQuery(&ClickhouseRequest{}) @@ -30,7 +30,7 @@ func TestClickhouse_GetNextServer(t *testing.T) { func TestClickhouse_Send(t *testing.T) { c := NewClickhouse(300, 10, "", false) - c.AddServer("") + c.AddServer("", true) c.Send(&ClickhouseRequest{}) for !c.Queue.Empty() { time.Sleep(10) @@ -39,7 +39,7 @@ func TestClickhouse_Send(t *testing.T) { func TestClickhouse_SendQuery(t *testing.T) { c := NewClickhouse(300, 10, "", false) - c.AddServer("") + c.AddServer("", true) c.GetNextServer() c.Servers[0].Bad = true _, status, err := c.SendQuery(&ClickhouseRequest{}) @@ -49,7 +49,7 @@ func TestClickhouse_SendQuery(t *testing.T) { func TestClickhouse_SendQuery1(t *testing.T) { c := NewClickhouse(-1, 10, "", false) - c.AddServer("") + c.AddServer("", true) c.GetNextServer() c.Servers[0].Bad = true s := c.GetNextServer() diff --git a/config.sample.json b/config.sample.json index 6801e03..23ac9df 100644 --- a/config.sample.json +++ b/config.sample.json @@ -6,6 +6,7 @@ "remove_query_id": true, "dump_check_interval": 300, "debug": false, + "log_queries": true, "dump_dir": "dumps", "clickhouse": { "down_timeout": 60, diff --git a/doc.go b/doc.go index 9043fd8..0c949b7 100644 --- a/doc.go +++ b/doc.go @@ -45,6 +45,7 @@ Configuration file "flush_count": 10000, // check by \n char "flush_interval": 1000, // milliseconds "debug": false, // log incoming requests + "log_queries": true, // log "Sending/sent x rows to" messages for each query "dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors) "clickhouse": { "down_timeout": 300, // wait if server in down (seconds) diff --git a/dump_test.go b/dump_test.go index 46285fd..6d87b56 100644 --- a/dump_test.go +++ b/dump_test.go @@ -14,7 +14,7 @@ func TestDump_Dump(t *testing.T) { dumpDir := "dumptest" dumper := NewDumper(dumpDir) c.Dumper = dumper - c.AddServer("") + c.AddServer("", true) c.Dump("eee", "eee", "error", "", 502) assert.True(t, c.Empty()) buf, _, err := dumper.GetDumpData(dumper.dumpName(1, "", 502)) diff --git a/server.go b/server.go index 0292d88..df45553 100644 --- a/server.go +++ b/server.go @@ -21,10 +21,11 @@ import ( // Server - main server object type Server struct { - Listen string - Collector *Collector - Debug bool - echo *echo.Echo + Listen string + Collector *Collector + Debug bool + LogQueries bool + echo *echo.Echo } // Status - response status struct @@ -36,8 +37,8 @@ type Status struct { } // NewServer - create server -func NewServer(listen string, collector *Collector, debug bool) *Server { - return &Server{listen, collector, debug, echo.New()} +func NewServer(listen string, collector *Collector, debug bool, logQueries bool) *Server { + return &Server{listen, collector, debug, logQueries, echo.New()} } func (server *Server) writeHandler(c echo.Context) error { @@ -113,8 +114,8 @@ func (server *Server) Shutdown(ctx context.Context) error { } // InitServer - run server -func InitServer(listen string, collector *Collector, debug bool) *Server { - server := NewServer(listen, collector, debug) +func InitServer(listen string, collector *Collector, debug bool, logQueries bool) *Server { + server := NewServer(listen, collector, debug, logQueries) server.echo.POST("/", server.writeHandler) server.echo.GET("/status", server.statusHandler) server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler())) @@ -146,7 +147,7 @@ func RunServer(cnf Config) { sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout, cnf.Clickhouse.tlsServerName, cnf.Clickhouse.tlsSkipVerify) sender.Dumper = dumper for _, url := range cnf.Clickhouse.Servers { - sender.AddServer(url) + sender.AddServer(url, cnf.LogQueries) } collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval, cnf.CleanInterval, cnf.RemoveQueryID) @@ -155,7 +156,7 @@ func RunServer(cnf Config) { signals := make(chan os.Signal) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - srv := InitServer(cnf.Listen, collect, cnf.Debug) + srv := InitServer(cnf.Listen, collect, cnf.Debug, cnf.LogQueries) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/server_test.go b/server_test.go index 0b1d38d..c383f7c 100644 --- a/server_test.go +++ b/server_test.go @@ -20,7 +20,7 @@ import ( func TestRunServer(t *testing.T) { cnf, _ := ReadConfig("wrong_config.json") collector := NewCollector(&fakeSender{}, 1000, 1000, 0, true) - server := InitServer("", collector, false) + server := InitServer("", collector, false, true) go server.Start(cnf) server.echo.POST("/", server.writeHandler) @@ -107,8 +107,8 @@ func TestServer_MultiServer(t *testing.T) { defer s2.Close() sender := NewClickhouse(10, 10, "", false) - sender.AddServer(s1.URL) - sender.AddServer(s2.URL) + sender.AddServer(s1.URL, true) + sender.AddServer(s2.URL, true) collect := NewCollector(sender, 1000, 1000, 0, true) collect.AddTable("test") collect.Push("eee", "eee") diff --git a/utils.go b/utils.go index da9e25d..0ee6a6b 100644 --- a/utils.go +++ b/utils.go @@ -29,6 +29,7 @@ type Config struct { DumpCheckInterval int `json:"dump_check_interval"` DumpDir string `json:"dump_dir"` Debug bool `json:"debug"` + LogQueries bool `json:"log_queries"` MetricsPrefix string `json:"metrics_prefix"` UseTLS bool `json:"use_tls"` TLSCertFile string `json:"tls_cert_file"` @@ -102,6 +103,7 @@ func ReadConfig(configFile string) (Config, error) { readEnvInt("CLICKHOUSE_CONNECT_TIMEOUT", &cnf.Clickhouse.ConnectTimeout) readEnvBool("CLICKHOUSE_INSECURE_TLS_SKIP_VERIFY", &cnf.Clickhouse.tlsSkipVerify) readEnvString("METRICS_PREFIX", &cnf.MetricsPrefix) + readEnvBool("LOG_QUERIES", &cnf.LogQueries) serversList := os.Getenv("CLICKHOUSE_SERVERS") if serversList != "" {