-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathslowql.go
155 lines (133 loc) · 3.48 KB
/
slowql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Package slowql provides everything needed to parse slow query logs from
// different databases (such as MySQL, MariaDB).
// Along to a parser, it proposes a simple API with few functions that allow
// you to get everything needed to compute your slow queries.
package slowql
import (
"bufio"
"io"
"strings"
"time"
"github.com/devops-works/slowql/database/mariadb"
"github.com/devops-works/slowql/database/mysql"
"github.com/devops-works/slowql/query"
"github.com/devops-works/slowql/server"
"github.com/sirupsen/logrus"
)
// Kind is a database kind
type Kind int
const (
// Unknown type
Unknown Kind = iota
// MySQL type
MySQL
// MariaDB type
MariaDB
// PXC type
PXC
)
// Database is the parser interface
type Database interface {
// // GetNext returns the next query of the parser
// GetNext() Query
// // GetServerMeta returns informations about the SQL server in usage
// GetServerMeta() Server
ParseBlocks(rawBlocks chan []string)
ParseServerMeta(chan []string)
GetServerMeta() server.Server
}
// Parser holds a slowql parser
type Parser struct {
db Database
waitingList chan query.Query
rawBlocks chan []string
servermeta chan []string
}
// NewParser returns a new parser depending on the desired kind
func NewParser(k Kind, r io.Reader) Parser {
var p Parser
p.rawBlocks = make(chan []string, 4096)
p.servermeta = make(chan []string)
p.waitingList = make(chan query.Query, 4096)
go scan(*bufio.NewScanner(r), p.rawBlocks, p.servermeta)
switch k {
case MySQL, PXC:
p.db = mysql.New(p.waitingList)
case MariaDB:
p.db = mariadb.New(p.waitingList)
}
p.db.ParseServerMeta(p.servermeta)
go p.db.ParseBlocks(p.rawBlocks)
// This is gross but we are sure that some queries will be already parsed at
// when the user will call the package's functions
time.Sleep(10 * time.Millisecond)
return p
}
// GetNext returns the next query in line
func (p *Parser) GetNext() query.Query {
var q query.Query
select {
case q = <-p.waitingList:
return q
case <-time.After(2 * time.Second):
close(p.waitingList)
}
return q
}
// GetServerMeta returns server meta information
func (p *Parser) GetServerMeta() server.Server {
return p.db.GetServerMeta()
}
func scan(s bufio.Scanner, rawBlocks, servermeta chan []string) {
var bloc []string
inHeader, inQuery := false, false
// initial buffer size (64k)
buf := make([]byte, 0, 64*1024)
// allow scanner to allocate up to 1024k
// by default bufio.MaxScanTokenSize (65536) is used
s.Buffer(buf, 1024*1024)
// Parse the server informations
var lines []string
for i := 0; i < 3; i++ {
s.Scan()
lines = append(lines, s.Text())
}
servermeta <- lines
for s.Scan() {
line := s.Text()
// Drop useless lines
if strings.Contains(s.Text(), "SET timestamp") {
continue
}
// This big if/else statement detects if the curernt line in a header
// or a request, and if it belongs to the same bloc or not
// In header
if strings.HasPrefix(line, "#") {
inHeader = true
if inQuery {
// A new bloc is starting, we send the previous one if it is not
// the first one
inQuery = false
if len(bloc) > 0 {
rawBlocks <- bloc
bloc = nil
}
}
} else { // In request
inQuery = true
if inHeader {
// We were in an header, and now are in a request, but in the
// same bloc
inHeader = false
}
}
bloc = append(bloc, line)
}
// In case of error, log it
if err := s.Err(); err != nil {
logrus.Error(err)
}
// Send the last bloc
rawBlocks <- bloc
close(rawBlocks)
}