Skip to content

Commit

Permalink
Merge pull request #66 from derN3rd/master
Browse files Browse the repository at this point in the history
add log_queries option to hide sending/sent output from logs
  • Loading branch information
nikepan authored Jul 15, 2023
2 parents 4e1e60e + ccaa0f9 commit 362e707
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 25 deletions.
9 changes: 5 additions & 4 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type ClickhouseServer struct {
LastRequest time.Time
Bad bool
Client *http.Client
LogQueries bool
}

// Clickhouse - main clickhouse sender object
Expand Down Expand Up @@ -75,12 +76,12 @@ func NewClickhouse(downTimeout int, connectTimeout int, tlsServerName string, tl
}

// AddServer - add clickhouse server url
func (c *Clickhouse) AddServer(url string) {
func (c *Clickhouse) AddServer(url string, logQueries bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.Servers = append(c.Servers, &ClickhouseServer{URL: url, Client: &http.Client{
Timeout: time.Second * time.Duration(c.ConnectTimeout), Transport: c.Transport,
}})
}, LogQueries: logQueries })
}

// DumpServers - dump servers state to prometheus
Expand Down Expand Up @@ -193,15 +194,15 @@ func (srv *ClickhouseServer) SendQuery(r *ClickhouseRequest) (response string, s
if r.Params != "" {
url += "?" + r.Params
}
if r.isInsert {
if r.isInsert && srv.LogQueries {
log.Printf("INFO: sending %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
}
resp, err := srv.Client.Post(url, "text/plain", strings.NewReader(r.Content))
if err != nil {
srv.Bad = true
return err.Error(), http.StatusBadGateway, ErrServerIsDown
}
if r.isInsert {
if r.isInsert && srv.LogQueries {
log.Printf("INFO: sent %+v rows to %+v of %+v\n", r.Count, srv.URL, r.Query)
}
buf, _ := ioutil.ReadAll(resp.Body)
Expand Down
14 changes: 7 additions & 7 deletions clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

func TestClickhouse_GetNextServer(t *testing.T) {
c := NewClickhouse(300, 10, "", false)
c.AddServer("")
c.AddServer("http://127.0.0.1:8124")
c.AddServer("http://127.0.0.1:8125")
c.AddServer("http://127.0.0.1:8123")
c.AddServer("", true)
c.AddServer("http://127.0.0.1:8124", true)
c.AddServer("http://127.0.0.1:8125", true)
c.AddServer("http://127.0.0.1:8123", true)
s := c.GetNextServer()
assert.Equal(t, "", s.URL)
s.SendQuery(&ClickhouseRequest{})
Expand All @@ -30,7 +30,7 @@ func TestClickhouse_GetNextServer(t *testing.T) {

func TestClickhouse_Send(t *testing.T) {
c := NewClickhouse(300, 10, "", false)
c.AddServer("")
c.AddServer("", true)
c.Send(&ClickhouseRequest{})
for !c.Queue.Empty() {
time.Sleep(10)
Expand All @@ -39,7 +39,7 @@ func TestClickhouse_Send(t *testing.T) {

func TestClickhouse_SendQuery(t *testing.T) {
c := NewClickhouse(300, 10, "", false)
c.AddServer("")
c.AddServer("", true)
c.GetNextServer()
c.Servers[0].Bad = true
_, status, err := c.SendQuery(&ClickhouseRequest{})
Expand All @@ -49,7 +49,7 @@ func TestClickhouse_SendQuery(t *testing.T) {

func TestClickhouse_SendQuery1(t *testing.T) {
c := NewClickhouse(-1, 10, "", false)
c.AddServer("")
c.AddServer("", true)
c.GetNextServer()
c.Servers[0].Bad = true
s := c.GetNextServer()
Expand Down
1 change: 1 addition & 0 deletions config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"remove_query_id": true,
"dump_check_interval": 300,
"debug": false,
"log_queries": true,
"dump_dir": "dumps",
"clickhouse": {
"down_timeout": 60,
Expand Down
1 change: 1 addition & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Configuration file
"flush_count": 10000, // check by \n char
"flush_interval": 1000, // milliseconds
"debug": false, // log incoming requests
"log_queries": true, // log "Sending/sent x rows to" messages for each query
"dump_dir": "dumps", // directory for dump unsended data (if clickhouse errors)
"clickhouse": {
"down_timeout": 300, // wait if server in down (seconds)
Expand Down
2 changes: 1 addition & 1 deletion dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestDump_Dump(t *testing.T) {
dumpDir := "dumptest"
dumper := NewDumper(dumpDir)
c.Dumper = dumper
c.AddServer("")
c.AddServer("", true)
c.Dump("eee", "eee", "error", "", 502)
assert.True(t, c.Empty())
buf, _, err := dumper.GetDumpData(dumper.dumpName(1, "", 502))
Expand Down
21 changes: 11 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (

// Server - main server object
type Server struct {
Listen string
Collector *Collector
Debug bool
echo *echo.Echo
Listen string
Collector *Collector
Debug bool
LogQueries bool
echo *echo.Echo
}

// Status - response status struct
Expand All @@ -36,8 +37,8 @@ type Status struct {
}

// NewServer - create server
func NewServer(listen string, collector *Collector, debug bool) *Server {
return &Server{listen, collector, debug, echo.New()}
func NewServer(listen string, collector *Collector, debug bool, logQueries bool) *Server {
return &Server{listen, collector, debug, logQueries, echo.New()}
}

func (server *Server) writeHandler(c echo.Context) error {
Expand Down Expand Up @@ -113,8 +114,8 @@ func (server *Server) Shutdown(ctx context.Context) error {
}

// InitServer - run server
func InitServer(listen string, collector *Collector, debug bool) *Server {
server := NewServer(listen, collector, debug)
func InitServer(listen string, collector *Collector, debug bool, logQueries bool) *Server {
server := NewServer(listen, collector, debug, logQueries)
server.echo.POST("/", server.writeHandler)
server.echo.GET("/status", server.statusHandler)
server.echo.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
Expand Down Expand Up @@ -146,7 +147,7 @@ func RunServer(cnf Config) {
sender := NewClickhouse(cnf.Clickhouse.DownTimeout, cnf.Clickhouse.ConnectTimeout, cnf.Clickhouse.tlsServerName, cnf.Clickhouse.tlsSkipVerify)
sender.Dumper = dumper
for _, url := range cnf.Clickhouse.Servers {
sender.AddServer(url)
sender.AddServer(url, cnf.LogQueries)
}

collect := NewCollector(sender, cnf.FlushCount, cnf.FlushInterval, cnf.CleanInterval, cnf.RemoveQueryID)
Expand All @@ -155,7 +156,7 @@ func RunServer(cnf Config) {
signals := make(chan os.Signal)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

srv := InitServer(cnf.Listen, collect, cnf.Debug)
srv := InitServer(cnf.Listen, collect, cnf.Debug, cnf.LogQueries)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
6 changes: 3 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func TestRunServer(t *testing.T) {
cnf, _ := ReadConfig("wrong_config.json")
collector := NewCollector(&fakeSender{}, 1000, 1000, 0, true)
server := InitServer("", collector, false)
server := InitServer("", collector, false, true)
go server.Start(cnf)
server.echo.POST("/", server.writeHandler)

Expand Down Expand Up @@ -107,8 +107,8 @@ func TestServer_MultiServer(t *testing.T) {
defer s2.Close()

sender := NewClickhouse(10, 10, "", false)
sender.AddServer(s1.URL)
sender.AddServer(s2.URL)
sender.AddServer(s1.URL, true)
sender.AddServer(s2.URL, true)
collect := NewCollector(sender, 1000, 1000, 0, true)
collect.AddTable("test")
collect.Push("eee", "eee")
Expand Down
2 changes: 2 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
DumpCheckInterval int `json:"dump_check_interval"`
DumpDir string `json:"dump_dir"`
Debug bool `json:"debug"`
LogQueries bool `json:"log_queries"`
MetricsPrefix string `json:"metrics_prefix"`
UseTLS bool `json:"use_tls"`
TLSCertFile string `json:"tls_cert_file"`
Expand Down Expand Up @@ -102,6 +103,7 @@ func ReadConfig(configFile string) (Config, error) {
readEnvInt("CLICKHOUSE_CONNECT_TIMEOUT", &cnf.Clickhouse.ConnectTimeout)
readEnvBool("CLICKHOUSE_INSECURE_TLS_SKIP_VERIFY", &cnf.Clickhouse.tlsSkipVerify)
readEnvString("METRICS_PREFIX", &cnf.MetricsPrefix)
readEnvBool("LOG_QUERIES", &cnf.LogQueries)

serversList := os.Getenv("CLICKHOUSE_SERVERS")
if serversList != "" {
Expand Down

0 comments on commit 362e707

Please sign in to comment.