Skip to content

Commit

Permalink
Merge pull request #45 from brokercap/v1.1.x
Browse files Browse the repository at this point in the history
V1.1.x
  • Loading branch information
jc3wish authored Apr 18, 2020
2 parents ec48f94 + 3b074db commit ed8a643
Show file tree
Hide file tree
Showing 22 changed files with 446 additions and 98 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: go
go:
- "1.12"
- "1.13"
- "1.14"

before_install:
- go get github.com/gmallard/stompngo
Expand Down
93 changes: 73 additions & 20 deletions Bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ limitations under the License.
package main

import (
"log"
"github.com/brokercap/Bifrost/plugin"
"github.com/brokercap/Bifrost/manager"
"github.com/brokercap/Bifrost/config"
"flag"
"os"
"os/signal"
"syscall"
"time"
"fmt"
"github.com/brokercap/Bifrost/config"
"github.com/brokercap/Bifrost/manager"
"github.com/brokercap/Bifrost/plugin"
"github.com/brokercap/Bifrost/server"
"io"
"sync"
"io/ioutil"
"fmt"
"strings"
"log"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"runtime/debug"
"github.com/brokercap/Bifrost/server"
"strconv"
"strings"
"sync"
"syscall"
"time"
)

var l sync.Mutex
Expand Down Expand Up @@ -124,9 +125,34 @@ func main() {
}

if *BifrostDaemon == "true"{
/*
file := execDir+"/pid.log"
logFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0700)
if err != nil {
panic(err)
}
loger := log.New(logFile, "[qSkiptool]",log.LstdFlags | log.Lshortfile | log.LUTC)
loger.Println("os.Getppid():",os.Getppid())
loger.Println("os.Getgid():",os.Getgid())
loger.Println("os.Getpid():",os.Getpid())
*/
isDaemo := false
if os.Getppid() != 1{
// 因为有一些桌面系统,父进程开了子进程之后,父进程退出之后,并不是由 pid=1 的 systemd 进程接管,可能有些系统给每个桌面帐号重新分配了一下systemd 进程
// 这里每去判断 一下是不是 systemd 进程名,如果是的话,也认为是父进程被退出了
cmdString := "ps -ef|grep "+strconv.Itoa(os.Getppid())+" | grep systemd|grep -v grep"
resultBytes,err := CmdShell(cmdString)
if err == nil && resultBytes != nil && string(resultBytes) != ""{
isDaemo = true
}
}else {
isDaemo = true
}
if !isDaemo {
filePath,_:=filepath.Abs(os.Args[0]) //将命令行参数中执行文件路径转换成可用路径
args:=append([]string{filePath},os.Args[1:]...)
fmt.Println(filePath)
fmt.Println(args)
os.StartProcess(filePath,args,&os.ProcAttr{Files:[]*os.File{os.Stdin,os.Stdout,os.Stderr}})
return
}else{
Expand Down Expand Up @@ -251,19 +277,46 @@ func initLog(){
}

func WritePid(){
f, err2 := os.OpenFile(*BifrostPid, os.O_CREATE|os.O_RDWR, 0700) //打开文件
if err2 !=nil{
log.Println("Open BifrostPid Error; File:",*BifrostPid,"; Error:",err2)
var err error
var pidFileFd *os.File
pidFileFd, err = os.OpenFile(*BifrostPid, os.O_CREATE|os.O_RDWR, 0700) //打开文件
if err !=nil{
log.Println("Open BifrostPid Error; File:",*BifrostPid,"; Error:",err)
os.Exit(1)
return
}
pidContent, err2 := ioutil.ReadAll(f)
defer pidFileFd.Close()
pidContent, err2 := ioutil.ReadAll(pidFileFd)
if string(pidContent) != ""{
log.Println("Birostd server quit without updating PID file ; File:",*BifrostPid,"; Error:",err2)
os.Exit(1)
ExitBool := true
cmdString := "ps -ef|grep "+string(pidContent)+" | grep "+filepath.Base(os.Args[0]+"|grep -v grep")
resultBytes,err := CmdShell(cmdString)
// err 不为 nil 则代表没有grep 到进程,可以认为有可能被 kill -9 等操作了
if err != nil && resultBytes != nil{
ExitBool = false
}else{
log.Println(cmdString," result:",string(resultBytes)," err:",err,)
}
if ExitBool {
log.Println("Birostd server quit without updating PID file ; File:", *BifrostPid, "; Error:", err2)
os.Exit(1)
}
}
os.Truncate(*BifrostPid, 0)
pidFileFd.Seek(0,0)
io.WriteString(pidFileFd,fmt.Sprint(os.Getpid()))
}

func CmdShell(cmdString string)([]byte,error){
switch runtime.GOOS {
case "linux","darwin","freebsd":
cmd := exec.Command("/bin/bash", "-c", cmdString)
return cmd.Output()
break
default:
break
}
defer f.Close()
io.WriteString(f, fmt.Sprint(os.Getpid()))
return nil,fmt.Errorf(runtime.GOOS+" not supported")
}

func doSaveDbInfo(){
Expand Down
14 changes: 12 additions & 2 deletions Bristol/mysql/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,15 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
parser.conn.Close()
}
errs = fmt.Errorf(fmt.Sprint(err))
log.Println(err)
}
}()
if parser.connStatus == 0 {
parser.initConn()
}
//set dbAndTable Name tableId
parser.tableNameMap[database+"."+tablename] = tableId
sql := "SELECT COLUMN_NAME,COLUMN_KEY,COLUMN_TYPE,CHARACTER_SET_NAME,COLLATION_NAME,NUMERIC_SCALE,EXTRA,COLUMN_DEFAULT,DATA_TYPE FROM information_schema.columns WHERE table_schema='" + database + "' AND table_name='" + tablename + "' ORDER BY `ORDINAL_POSITION` ASC"
sql := "SELECT COLUMN_NAME,COLUMN_KEY,COLUMN_TYPE,CHARACTER_SET_NAME,COLLATION_NAME,NUMERIC_SCALE,EXTRA,COLUMN_DEFAULT,DATA_TYPE,CHARACTER_OCTET_LENGTH FROM information_schema.columns WHERE table_schema='" + database + "' AND table_name='" + tablename + "' ORDER BY `ORDINAL_POSITION` ASC"
stmt, err := parser.conn.Prepare(sql)
p := make([]driver.Value, 0)
rows, err := stmt.Query(p)
Expand All @@ -231,7 +232,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
ColumnSchemaTypeList: make([]*column_schema_type,0),
}
for {
dest := make([]driver.Value, 9, 9)
dest := make([]driver.Value, 10, 10)
err := rows.Next(dest)
if err != nil {
break
Expand All @@ -245,6 +246,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
var enum_values, set_values []string
var COLUMN_DEFAULT string
var DATA_TYPE string
var CHARACTER_OCTET_LENGTH uint64

COLUMN_NAME = dest[0].(string)
COLUMN_KEY = dest[1].(string)
Expand Down Expand Up @@ -310,6 +312,13 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
} else {
set_values = make([]string, 0)
}

if dest[9] == nil{
CHARACTER_OCTET_LENGTH = 0
}else{
CHARACTER_OCTET_LENGTH = dest[9].(uint64)
}

tableInfo.ColumnSchemaTypeList = append(tableInfo.ColumnSchemaTypeList, &column_schema_type{
COLUMN_NAME: COLUMN_NAME,
COLUMN_KEY: COLUMN_KEY,
Expand All @@ -325,6 +334,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
NUMERIC_SCALE:NUMERIC_SCALE,
COLUMN_DEFAULT:COLUMN_DEFAULT,
DATA_TYPE:DATA_TYPE,
CHARACTER_OCTET_LENGTH:CHARACTER_OCTET_LENGTH,
})

if strings.ToUpper(COLUMN_KEY) == "PRI"{
Expand Down
8 changes: 6 additions & 2 deletions Bristol/mysql/binlog_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ import (
)

func CheckBinlogIsRight(dbUri string,filename string, position uint32) error{
// position == 4 是 Format_desc 事件
if position == 4{
return nil
}
db := NewConnect(dbUri)
defer db.Close()
sql := "show binlog events IN '"+filename+"' FROM "+ strconv.FormatInt(int64(position),10) +" LIMIT 1"
stmt,err := db.Prepare(sql)
if err !=nil{
if err != nil{
return err
}
defer stmt.Close()
rows, err := stmt.Query([]driver.Value{})
defer rows.Close()
if err != nil {
return err
}
defer rows.Close()
var returnErr error
for {
dest := make([]driver.Value, 6, 6)
Expand Down
30 changes: 27 additions & 3 deletions Bristol/mysql/event_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,33 @@ func (parser *eventParser) parseEventRow(buf *bytes.Buffer, tableMap *TableMapEv

case FIELD_TYPE_STRING:
var length int
var b byte
b, e = buf.ReadByte()
length = int(b)
if tableSchemaMap[i].CHARACTER_OCTET_LENGTH > 255 {
var short uint16
e = binary.Read(buf, binary.LittleEndian, &short)
length = int(short)
}else{
var b byte
b, e = buf.ReadByte()
length = int(b)
}
//row[column_name] = string(buf.Next(length+1))
/*
log.Println("======================")
log.Println("column_name:", column_name," length:",length)
//log.Println("name:",tableMap.columnMetaData[i].name)
log.Println("size:",tableMap.columnMetaData[i].size)
log.Println("precision:",tableMap.columnMetaData[i].precision)
log.Println("max_length:",tableMap.columnMetaData[i].max_length)
log.Println("length_size:",tableMap.columnMetaData[i].length_size)
log.Println("fsp:",tableMap.columnMetaData[i].fsp)
log.Println("decimals:",tableMap.columnMetaData[i].decimals)
log.Println("unsigned:",tableMap.columnMetaData[i].unsigned)
log.Println("column_type:",tableMap.columnMetaData[i].column_type)
log.Println("tableMap.columnMetaData[i]:",tableMap.columnMetaData[i])
*/
row[column_name] = string(buf.Next(length))
//log.Println("column_name: ",column_name," == ",row[column_name])

break

case FIELD_TYPE_ENUM:
Expand Down Expand Up @@ -558,6 +581,7 @@ func (parser *eventParser) parseEventRow(buf *bytes.Buffer, tableMap *TableMapEv
default:
return nil, fmt.Errorf("Unknown FieldType %d", tableMap.columnTypes[i])
}
//log.Println("column_name:",column_name," row[column_name]:",row[column_name])
if e != nil {
log.Println("lastField err:",column_name,tableMap.columnMetaData[i].column_type,e)
return nil, e
Expand Down
1 change: 1 addition & 0 deletions Bristol/mysql/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type column_schema_type struct {
auto_increment bool
COLUMN_DEFAULT string
DATA_TYPE string
CHARACTER_OCTET_LENGTH uint64
}

type MysqlConnection interface {
Expand Down
29 changes: 13 additions & 16 deletions Bristol/mysql/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,27 +711,31 @@ func (stmt mysqlStmt) buildExecutePacket(args *[]driver.Value) (e error) {

// Reset packet-sequence
stmt.mc.sequence = 0

pktLen := 1 + 4 + 1 + 4 + (stmt.paramCount+7)/8 + 1 + argsLen*2
paramValues := make([][]byte, 0, argsLen)
paramTypes := make([]byte, 0, argsLen*2)
bitMask := uint64(0)

var i, valLen int
var pv reflect.Value

var nullMask []byte
maskLen := (argsLen+7)/8
nullMask = make([]byte,maskLen)
for i := 0; i < maskLen; i++ {
nullMask[i] = 0
}
for i = 0; i < stmt.paramCount; i++ {
// build nullBitMap
if (*args)[i] == nil {
bitMask += 1 << uint(i)
}

// cache types and values
switch (*args)[i].(type) {
case nil:
nullMask[i/8] |= 1 << (uint(i) & 7)
paramTypes = append(paramTypes, []byte{
byte(FIELD_TYPE_NULL),
0x0}...)
continue
}

// cache types and values
switch (*args)[i].(type) {
case []byte:
paramTypes = append(paramTypes, []byte{byte(FIELD_TYPE_STRING), 0x0}...)
val := (*args)[i].([]byte)
Expand Down Expand Up @@ -808,15 +812,8 @@ func (stmt mysqlStmt) buildExecutePacket(args *[]driver.Value) (e error) {
// iteration_count [4 bytes]
data = append(data, uint32ToBytes(1)...)

// append nullBitMap [(param_count+7)/8 bytes]
if stmt.paramCount > 0 {
// Convert bitMask to bytes
nullBitMap := make([]byte, (stmt.paramCount+7)/8)
for i = 0; i < len(nullBitMap); i++ {
nullBitMap[i] = byte(bitMask >> uint(i*8))
}

data = append(data, nullBitMap...)
data = append(data, nullMask...)
}

// newParameterBoundFlag 1 [1 byte]
Expand Down
2 changes: 1 addition & 1 deletion Bristol/mysql/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package mysql

const VERSION = "v1.1.0"
const VERSION = "v1.1.2-beta.01"
12 changes: 12 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
v1.1.1-release 2020-04-18
===========================
1. clickhouse插件delete合并一条sql执行
2. 全量任务查询sql查询分页语句通过子查询性能优化
3. 启动的时候获取正确位点新增部分日志
4. 修复mysql插件在数据库有存在特殊符号的时候报语句错误的bug
5. 修复mysql写入字段名是保留字段报错的bug
6. char存储字节大于255解析错误的bug
7. Mysql插件新增NullTransferDefault配置,可强制将NULL转成对应的默认值
8. 新复因mysql连接异常造成的崩溃问题
9. 修复多个null值的情况下,报Incorrect arguments to mysqld_stmt_execute 错误

v1.1.0-release 2019-12-08
===========================
1. redis 配置存储去掉了一个重复的 bifrost 字符串
Expand Down
2 changes: 1 addition & 1 deletion config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ limitations under the License.
*/
package config

const VERSION = "v1.1.0-release"
const VERSION = "v1.1.1-release"
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/brokercap/Bifrost

go 1.14

require (
github.com/ClickHouse/clickhouse-go v1.3.12
github.com/Shopify/sarama v1.23.0
Expand Down Expand Up @@ -35,5 +37,3 @@ replace (
gopkg.in/vmihailenco/msgpack.v2 => github.com/vmihailenco/msgpack v2.9.1+incompatible
gopkg.in/yaml.v2 => github.com/go-yaml/yaml v0.0.0-20181115110504-51d6538a90f8
)

go 1.13
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
Loading

0 comments on commit ed8a643

Please sign in to comment.