From 9cbf831257d3343dd4412268622435cea64230fc Mon Sep 17 00:00:00 2001 From: jc3wish Date: Fri, 10 Jan 2020 22:20:36 +0800 Subject: [PATCH 01/20] jc3wish v1.1.1 --- config/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/version.go b/config/version.go index d0117449..e269171a 100755 --- a/config/version.go +++ b/config/version.go @@ -15,4 +15,4 @@ limitations under the License. */ package config -const VERSION = "v1.1.0-release" +const VERSION = "v1.1.1-beta.01" From 7ecca5ab165129917bf9f45b2af2bc07307637b5 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Fri, 10 Jan 2020 22:23:00 +0800 Subject: [PATCH 02/20] jc3wish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.kill -9 或者断电等异常退出后,自动重启不受旧pid文件影响 2. 部门桌面系统daemon运行的时候,子进程非pid=1 systemd 进程接管,新增了是否systemd判断 --- Bifrost.go | 93 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/Bifrost.go b/Bifrost.go index 88093c00..92a83a8e 100755 --- a/Bifrost.go +++ b/Bifrost.go @@ -16,25 +16,26 @@ limitations under the License. package main import ( - "log" - "github.com/brokercap/Bifrost/plugin" - "github.com/brokercap/Bifrost/manager" - "github.com/brokercap/Bifrost/config" "flag" - "os" - "os/signal" - "syscall" - "time" + "fmt" + "github.com/brokercap/Bifrost/config" + "github.com/brokercap/Bifrost/manager" + "github.com/brokercap/Bifrost/plugin" + "github.com/brokercap/Bifrost/server" "io" - "sync" "io/ioutil" - "fmt" - "strings" + "log" + "os" + "os/exec" + "os/signal" "path/filepath" "runtime" "runtime/debug" - "github.com/brokercap/Bifrost/server" "strconv" + "strings" + "sync" + "syscall" + "time" ) var l sync.Mutex @@ -124,9 +125,34 @@ func main() { } if *BifrostDaemon == "true"{ + /* + file := execDir+"/pid.log" + logFile, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0700) + if err != nil { + panic(err) + } + loger := log.New(logFile, "[qSkiptool]",log.LstdFlags | log.Lshortfile | log.LUTC) + loger.Println("os.Getppid():",os.Getppid()) + loger.Println("os.Getgid():",os.Getgid()) + loger.Println("os.Getpid():",os.Getpid()) + */ + isDaemo := false if os.Getppid() != 1{ + // 因为有一些桌面系统,父进程开了子进程之后,父进程退出之后,并不是由 pid=1 的 systemd 进程接管,可能有些系统给每个桌面帐号重新分配了一下systemd 进程 + // 这里每去判断 一下是不是 systemd 进程名,如果是的话,也认为是父进程被退出了 + cmdString := "ps -ef|grep "+strconv.Itoa(os.Getppid())+" | grep systemd|grep -v grep" + resultBytes,err := CmdShell(cmdString) + if err == nil && resultBytes != nil && string(resultBytes) != ""{ + isDaemo = true + } + }else { + isDaemo = true + } + if !isDaemo { filePath,_:=filepath.Abs(os.Args[0]) //将命令行参数中执行文件路径转换成可用路径 args:=append([]string{filePath},os.Args[1:]...) + fmt.Println(filePath) + fmt.Println(args) os.StartProcess(filePath,args,&os.ProcAttr{Files:[]*os.File{os.Stdin,os.Stdout,os.Stderr}}) return }else{ @@ -251,19 +277,46 @@ func initLog(){ } func WritePid(){ - f, err2 := os.OpenFile(*BifrostPid, os.O_CREATE|os.O_RDWR, 0700) //打开文件 - if err2 !=nil{ - log.Println("Open BifrostPid Error; File:",*BifrostPid,"; Error:",err2) + var err error + var pidFileFd *os.File + pidFileFd, err = os.OpenFile(*BifrostPid, os.O_CREATE|os.O_RDWR, 0700) //打开文件 + if err !=nil{ + log.Println("Open BifrostPid Error; File:",*BifrostPid,"; Error:",err) os.Exit(1) return } - pidContent, err2 := ioutil.ReadAll(f) + defer pidFileFd.Close() + pidContent, err2 := ioutil.ReadAll(pidFileFd) if string(pidContent) != ""{ - log.Println("Birostd server quit without updating PID file ; File:",*BifrostPid,"; Error:",err2) - os.Exit(1) + ExitBool := true + cmdString := "ps -ef|grep "+string(pidContent)+" | grep "+filepath.Base(os.Args[0]+"|grep -v grep") + resultBytes,err := CmdShell(cmdString) + // err 不为 nil 则代表没有grep 到进程,可以认为有可能被 kill -9 等操作了 + if err != nil && resultBytes != nil{ + ExitBool = false + }else{ + log.Println(cmdString," result:",string(resultBytes)," err:",err,) + } + if ExitBool { + log.Println("Birostd server quit without updating PID file ; File:", *BifrostPid, "; Error:", err2) + os.Exit(1) + } + } + os.Truncate(*BifrostPid, 0) + pidFileFd.Seek(0,0) + io.WriteString(pidFileFd,fmt.Sprint(os.Getpid())) +} + +func CmdShell(cmdString string)([]byte,error){ + switch runtime.GOOS { + case "linux","darwin","freebsd": + cmd := exec.Command("/bin/bash", "-c", cmdString) + return cmd.Output() + break + default: + break } - defer f.Close() - io.WriteString(f, fmt.Sprint(os.Getpid())) + return nil,fmt.Errorf(runtime.GOOS+" not supported") } func doSaveDbInfo(){ From c40aaf7960afb889ef7133ad619254fa38b0f259 Mon Sep 17 00:00:00 2001 From: zero Date: Wed, 15 Jan 2020 23:44:28 +0800 Subject: [PATCH 03/20] =?UTF-8?q?clickhouse=E6=8F=92=E4=BB=B6delete?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E4=B8=80=E6=9D=A1sql=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/clickhouse/src/clickhouse.go | 45 +++++++++++++++++++---- plugin/clickhouse/src/clickhouse_test.go | 46 ++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/plugin/clickhouse/src/clickhouse.go b/plugin/clickhouse/src/clickhouse.go index 09bd4c2c..ff902313 100644 --- a/plugin/clickhouse/src/clickhouse.go +++ b/plugin/clickhouse/src/clickhouse.go @@ -14,8 +14,8 @@ import ( ) -const VERSION = "v1.1.0" -const BIFROST_VERION = "v1.1.0" +const VERSION = "v1.1.1" +const BIFROST_VERION = "v1.1.1" var l sync.RWMutex @@ -115,6 +115,7 @@ type PluginParam struct { replaceInto bool // 记录当前表是否有replace into操作 PriKey []fieldStruct ckPriKey string // ck 主键字段 + ckPriKeyFieldIsInt bool // ck 主键存储类型是否为int类型 mysqlPriKey string //ck对应 mysql 的主键id Data *dataTableStruct SyncType SyncType @@ -173,10 +174,18 @@ func (This *Conn) getCktFieldType() { } if len(ckFields) == 0{ return -} + } ckFieldsMap := make(map[string]string) for _,v:=range ckFields{ ckFieldsMap[v.Name] = v.Type + if v.Name == This.p.mysqlPriKey { + switch v.Type { + case "Int8","Nullable(Int8)","UInt8","Nullable(UInt8)","Int16","Nullable(Int16)","UInt16","Nullable(UInt16)","Int32","Nullable(Int32)","UInt32","Nullable(UInt32)","Int64","Nullable(Int64)","UInt64","Nullable(UInt64)": + This.p.ckPriKeyFieldIsInt = true + default: + This.p.ckPriKeyFieldIsInt = false + } + } } for k,v:=range This.p.Field{ @@ -281,6 +290,13 @@ func (This *Conn) getStmt(Type string) dbDriver.Stmt { log.Println("clickhouse getStmt delete err:",This.conn.err) } break + default: + //默认是传sql进来 + stmt,This.conn.err = This.conn.conn.Prepare(Type) + if This.conn.err != nil{ + log.Println("clickhouse getStmt err:",This.conn.err," sql:",Type) + } + break } if This.conn.err != nil{ @@ -455,21 +471,36 @@ func (This *Conn) Commit() (b *pluginDriver.PluginBinlog,e error) { return nil, This.conn.err } + // delete 的话,将多条数据,where id in (1,2) 方式合并 if len(deleteDataMap) > 0 { stmt = This.getStmt("delete") if stmt == nil { goto errLoop } + keys := make([]dbDriver.Value,0) for _, v := range deleteDataMap { - where := make([]dbDriver.Value, 1) - where[0] = v[This.p.mysqlPriKey] - _, This.err = stmt.Exec(where) + keys = append(keys,v[This.p.mysqlPriKey]) + } + if len(keys) > 0{ + var where string + //假如字段是int的话,就 in () + if This.p.ckPriKeyFieldIsInt { + where = strings.Replace(strings.Trim(fmt.Sprint(keys), "[]"), " ", ",", -1) + }else{ + where = "'"+strings.Replace(strings.Trim(fmt.Sprint(keys), "[]"), " ", "','", -1)+"'" + } + sql := "ALTER TABLE "+This.p.ckDatakey+" DELETE WHERE "+This.p.ckPriKey+ " in ( " +where+" )" + stmt = This.getStmt(sql) + if stmt == nil { + goto errLoop + } + _, This.err = stmt.Exec([]dbDriver.Value{where}) if This.err != nil { stmt.Close() goto errLoop } + stmt.Close() } - stmt.Close() } if len(insertDataMap) > 0 { diff --git a/plugin/clickhouse/src/clickhouse_test.go b/plugin/clickhouse/src/clickhouse_test.go index 3a24d24f..ee88771f 100644 --- a/plugin/clickhouse/src/clickhouse_test.go +++ b/plugin/clickhouse/src/clickhouse_test.go @@ -56,6 +56,21 @@ func initDBTable(delTable bool) { c.Close() } +func initDBTablePriString(delTable bool) { + c := MyPlugin.NewClickHouseDBConn(url) + sql1:= "CREATE DATABASE IF NOT EXISTS `"+SchemaName+"`" + c.Exec(sql1,[]driver.Value{}) + sql2:="CREATE TABLE IF NOT EXISTS "+SchemaName+"."+TableName+"(id String,testtinyint Int8,testsmallint Int16,testmediumint Int32,testint Int32,testbigint Int64,testvarchar String,testchar String,testenum String,testset String,testtime String,testdate Date,testyear Int16,testtimestamp DateTime,testdatetime DateTime,testfloat Float64,testdouble Float64,testdecimal Float64,testtext String,testblob String,testbit Int64,testbool Int8,testmediumblob String,testlongblob String,testtinyblob String,test_unsinged_tinyint UInt8,test_unsinged_smallint UInt16,test_unsinged_mediumint UInt32,test_unsinged_int UInt32,test_unsinged_bigint UInt64) ENGINE = MergeTree() ORDER BY (id);" + if delTable == false{ + c.Exec(sql2,[]driver.Value{}) + }else{ + sql3 := "DROP TABLE "+SchemaName+"."+TableName + c.Exec(sql3,[]driver.Value{}) + c.Exec(sql2,[]driver.Value{}) + } + c.Close() +} + func TestChechUri(t *testing.T){ myConn := MyPlugin.MyConn{} if err := myConn.CheckUri(url);err!= nil{ @@ -174,6 +189,37 @@ func TestCommit(t *testing.T){ } } +func TestCommitPriKeyIsString(t *testing.T){ + testBefore() + initDBTablePriString(true) + initSyncParam() + insertdata := event.GetTestInsertData() + conn.Insert(insertdata) + conn.Del(event.GetTestDeleteData()) + conn.Update(event.GetTestUpdateData()) + + conn.Insert(event.GetTestInsertData()) + conn.Del(event.GetTestDeleteData()) + conn.Insert(event.GetTestInsertData()) + _,err2 := conn.Commit() + if err2 != nil{ + t.Fatal(err2) + } + + conn.Del(event.GetTestDeleteData()) + conn.Update(event.GetTestUpdateData()) + + conn.Insert(event.GetTestInsertData()) + conn.Del(event.GetTestDeleteData()) + conn.Insert(event.GetTestInsertData()) + conn.Insert(event.GetTestInsertData()) + conn.Insert(event.GetTestInsertData()) + _,err2 = conn.Commit() + if err2 != nil{ + t.Fatal(err2) + } +} + func TestReConnCommit(t *testing.T){ testBefore() initDBTable(false) From 13a02434ea78b86e7d17e9a651d9f916756369b6 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Wed, 15 Jan 2020 23:49:13 +0800 Subject: [PATCH 04/20] =?UTF-8?q?=E5=85=A8=E9=87=8F=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2sql=E6=9F=A5=E8=AF=A2=E5=88=86=E9=A1=B5?= =?UTF-8?q?=E8=AF=AD=E5=8F=A5=E9=80=9A=E8=BF=87=E5=AD=90=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=80=A7=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/history/history_test.go | 38 ++++++++++++++++++++++++++++++++++ server/history/init.go | 26 +++++++++++++++++------ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/server/history/history_test.go b/server/history/history_test.go index a3013525..9250dc42 100644 --- a/server/history/history_test.go +++ b/server/history/history_test.go @@ -8,6 +8,7 @@ import ( "database/sql/driver" "reflect" "fmt" + "strconv" ) func TestGetDataList(t *testing.T) { @@ -455,4 +456,41 @@ func TestChekcDataType(t *testing.T) { }else{ t.Fatal(" test failed") } +} + +func TestCreateSQL(t *testing.T) { + This := &history.History{ + DbName:"test", + SchemaName:"bifrost_test", + TableName:"bristol_performance_test", + Status:history.HISTORY_STATUS_CLOSE, + NowStartI:0, + Property:history.HistoryProperty{ + ThreadNum:1, + ThreadCountPer:10, + Where:" id > 0 ", + }, + Uri:"root:@tcp(127.0.0.1:3306)/bifrost_test", + } + start := 0 + TablePriKey := "id" + var sql = "" + var where string = "" + if This.Property.Where != "" { + where = " WHERE " + This.Property.Where + } + var limit string = "" + limit = " LIMIT " + strconv.Itoa(start) + "," + strconv.Itoa(This.Property.ThreadCountPer); + if TablePriKey == "" { + sql = "select * from `" + This.SchemaName + "`.`" + This.TableName + "`" + where + limit + //sql := "select * from ? LIMIT ?,?" + }else{ + sql = "select a.* from `" + This.SchemaName + "`.`" + This.TableName + "` as a " + sql += " inner join (" + sql += " select "+ TablePriKey +" from `" + This.SchemaName + "`.`" + This.TableName + "`"+ where + limit + sql += " ) as b" + sql += " on a."+TablePriKey + " = b."+TablePriKey + } + + t.Log(sql) } \ No newline at end of file diff --git a/server/history/init.go b/server/history/init.go index a306dbc1..0e7fdf2c 100644 --- a/server/history/init.go +++ b/server/history/init.go @@ -300,24 +300,38 @@ func (This *History) threadStart(i int) { */ n := len(This.Fields) Pri := make([]*string,0) + TablePriKey := "" for _,v := range This.Fields{ if strings.ToUpper(*v.COLUMN_KEY) == "PRI"{ Pri = append(Pri,v.COLUMN_NAME) + if TablePriKey == ""{ + TablePriKey = *v.COLUMN_NAME + } } } + var where string = "" + if This.Property.Where != "" { + where = " WHERE " + This.Property.Where + } for { This.Lock() start = This.NowStartI This.NowStartI += This.Property.ThreadCountPer This.Unlock() - sql := "select * from `"+This.SchemaName+"`.`"+This.TableName +"`" - if This.Property.Where != ""{ - sql += " WHERE " + This.Property.Where + var sql = "" + var limit string = "" + limit = " LIMIT " + strconv.Itoa(start) + "," + strconv.Itoa(This.Property.ThreadCountPer); + if TablePriKey == "" { + sql = "select * from `" + This.SchemaName + "`.`" + This.TableName + "`" + where + limit + //sql := "select * from ? LIMIT ?,?" + }else{ + sql = "select a.* from `" + This.SchemaName + "`.`" + This.TableName + "` as a " + sql += " inner join (" + sql += " select "+ TablePriKey +" from `" + This.SchemaName + "`.`" + This.TableName + "`"+ where + limit + sql += " ) as b" + sql += " on a."+TablePriKey + " = b."+TablePriKey } - sql += " LIMIT " + strconv.Itoa(start) + "," + strconv.Itoa(This.Property.ThreadCountPer) - //sql := "select * from ? LIMIT ?,?" - stmt, err := db.Prepare(sql) if err != nil{ This.ThreadPool[i].Error = err From 8bcc560d337e8f2d2867fc18715204a134cdbca5 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Wed, 15 Jan 2020 23:49:48 +0800 Subject: [PATCH 05/20] v1.1.1-beta.02 --- config/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/version.go b/config/version.go index e269171a..51701d7c 100755 --- a/config/version.go +++ b/config/version.go @@ -15,4 +15,4 @@ limitations under the License. */ package config -const VERSION = "v1.1.1-beta.01" +const VERSION = "v1.1.1-beta.02" From e79174f32bcdd29742bdc6a7dcd2def2a61d6fc9 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Fri, 17 Jan 2020 23:05:58 +0800 Subject: [PATCH 06/20] =?UTF-8?q?=E5=81=87=E5=A6=82postion=3D=3D4=20?= =?UTF-8?q?=E7=9B=B4=E6=8E=A5=E6=94=BE=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Bristol/mysql/binlog_check.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Bristol/mysql/binlog_check.go b/Bristol/mysql/binlog_check.go index dd8323ee..e03112c7 100644 --- a/Bristol/mysql/binlog_check.go +++ b/Bristol/mysql/binlog_check.go @@ -8,19 +8,23 @@ import ( ) func CheckBinlogIsRight(dbUri string,filename string, position uint32) error{ + // position == 4 是 Format_desc 事件 + if position == 4{ + return nil + } db := NewConnect(dbUri) defer db.Close() sql := "show binlog events IN '"+filename+"' FROM "+ strconv.FormatInt(int64(position),10) +" LIMIT 1" stmt,err := db.Prepare(sql) - if err !=nil{ + if err != nil{ return err } defer stmt.Close() rows, err := stmt.Query([]driver.Value{}) - defer rows.Close() if err != nil { return err } + defer rows.Close() var returnErr error for { dest := make([]driver.Value, 6, 6) @@ -32,7 +36,7 @@ func CheckBinlogIsRight(dbUri string,filename string, position uint32) error{ Event_type :=dest[2].(string) switch Event_type { - case "Update_rows","Delete_rows","Insert_rows","Write_rows","Update_rows_v1","Delete_rows_v1","Insert_rows_v1","Write_rows_v1","Update_rows_v0","Delete_rows_v0","Insert_rows_v0","Write_rows_v0","Update_rows_v2","Delete_rows_v2","Insert_rows_v2","Write_rows_v2": + case "Update_rows","Delete_rows","Insert_rows","Write_rows","Format_desc","Update_rows_v1","Delete_rows_v1","Insert_rows_v1","Write_rows_v1","Update_rows_v0","Delete_rows_v0","Insert_rows_v0","Write_rows_v0","Update_rows_v2","Delete_rows_v2","Insert_rows_v2","Write_rows_v2": returnErr = fmt.Errorf("binlog position can't be "+Event_type) break default: From 309d26fbd3ba349660a0893a9b53f82ae94e7e13 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Fri, 17 Jan 2020 23:08:14 +0800 Subject: [PATCH 07/20] Bristol v1.1.1 --- Bristol/mysql/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Bristol/mysql/version.go b/Bristol/mysql/version.go index 9f9639eb..ca50b5a8 100644 --- a/Bristol/mysql/version.go +++ b/Bristol/mysql/version.go @@ -1,3 +1,3 @@ package mysql -const VERSION = "v1.1.0" +const VERSION = "v1.1.1" From 35c568d7aeb311ecbea2f70315095637b014d55e Mon Sep 17 00:00:00 2001 From: jc3wish Date: Fri, 17 Jan 2020 23:10:28 +0800 Subject: [PATCH 08/20] =?UTF-8?q?=E5=90=AF=E5=8A=A8=E7=9A=84=E6=97=B6?= =?UTF-8?q?=E5=80=99=E8=8E=B7=E5=8F=96=E6=AD=A3=E7=A1=AE=E4=BD=8D=E7=82=B9?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E9=83=A8=E5=88=86=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/db.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/db.go b/server/db.go index f2679ebd..67672e96 100755 --- a/server/db.go +++ b/server/db.go @@ -26,6 +26,7 @@ import ( "strings" "strconv" "fmt" + "runtime/debug" ) var dbAndTableSplitChars = "_-" @@ -324,15 +325,16 @@ func (db *db) AddReplicateDoDb(dbName string) bool { func (db *db) getRightBinlogPosition() (newPosition uint32) { defer func() { if err := recover();err != nil{ - log.Println(db.Name," getRightBinlogPosition recover err:",err) + log.Println(db.Name," getRightBinlogPosition recover err:",err," binlogDumpFileName:",db.binlogDumpFileName," binlogDumpPosition:",db.binlogDumpPosition) + log.Println(string(debug.Stack())) newPosition = 0 } }() err := mysql.CheckBinlogIsRight(db.ConnectUri,db.binlogDumpFileName,db.binlogDumpPosition) - if err ==nil { + if err == nil { return db.binlogDumpPosition } - log.Println(db.Name," getRightBinlogPosition err:",err) + log.Println(db.Name," getRightBinlogPosition err:",err," binlogDumpFileName:",db.binlogDumpFileName," binlogDumpPosition:",db.binlogDumpPosition) if strings.Index(err.Error(),"connect: operation timed out") != -1 { return newPosition } From e0a97003282314a0fbcb9405825ec3fc0e52421e Mon Sep 17 00:00:00 2001 From: jc3wish Date: Sat, 18 Jan 2020 11:20:49 +0800 Subject: [PATCH 09/20] =?UTF-8?q?=E9=80=BB=E8=BE=91=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Bristol/mysql/binlog_check.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Bristol/mysql/binlog_check.go b/Bristol/mysql/binlog_check.go index e03112c7..d25a83eb 100644 --- a/Bristol/mysql/binlog_check.go +++ b/Bristol/mysql/binlog_check.go @@ -36,7 +36,7 @@ func CheckBinlogIsRight(dbUri string,filename string, position uint32) error{ Event_type :=dest[2].(string) switch Event_type { - case "Update_rows","Delete_rows","Insert_rows","Write_rows","Format_desc","Update_rows_v1","Delete_rows_v1","Insert_rows_v1","Write_rows_v1","Update_rows_v0","Delete_rows_v0","Insert_rows_v0","Write_rows_v0","Update_rows_v2","Delete_rows_v2","Insert_rows_v2","Write_rows_v2": + case "Update_rows","Delete_rows","Insert_rows","Write_rows","Update_rows_v1","Delete_rows_v1","Insert_rows_v1","Write_rows_v1","Update_rows_v0","Delete_rows_v0","Insert_rows_v0","Write_rows_v0","Update_rows_v2","Delete_rows_v2","Insert_rows_v2","Write_rows_v2": returnErr = fmt.Errorf("binlog position can't be "+Event_type) break default: From 46a18f339a9bfda1e58a2e46cb2990f464cddcd5 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Mon, 10 Feb 2020 20:17:27 +0800 Subject: [PATCH 10/20] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dmysql=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E5=9C=A8=E6=95=B0=E6=8D=AE=E5=BA=93=E6=9C=89=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E7=89=B9=E6=AE=8A=E7=AC=A6=E5=8F=B7=E7=9A=84=E6=97=B6=E5=80=99?= =?UTF-8?q?=E6=8A=A5=E8=AF=AD=E5=8F=A5=E9=94=99=E8=AF=AF=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/mysql/src/mysql.go | 6 +++--- plugin/mysql/src/mysql_test.go | 23 +++++++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/plugin/mysql/src/mysql.go b/plugin/mysql/src/mysql.go index 7cb076bd..e5db7009 100644 --- a/plugin/mysql/src/mysql.go +++ b/plugin/mysql/src/mysql.go @@ -14,8 +14,8 @@ import ( ) -const VERSION = "v1.1.0" -const BIFROST_VERION = "v1.1.0" +const VERSION = "v1.1.1" +const BIFROST_VERION = "v1.1.1" type dataTableStruct struct { MetaMap map[string]string //字段类型 @@ -138,7 +138,7 @@ func (This *Conn) GetParam(p interface{}) (*PluginParam,error){ param.BatchSize = 500 } param.Data = &dataTableStruct{Data:make([]*pluginDriver.PluginDataType,0)} - param.Datakey = param.Schema+"."+param.Table + param.Datakey = "`"+param.Schema+"`.`"+param.Table+"`" param.toPriKey = param.PriKey[0].ToField param.mysqlPriKey = param.PriKey[0].FromMysqlField param.fieldCount = len(param.Field) diff --git a/plugin/mysql/src/mysql_test.go b/plugin/mysql/src/mysql_test.go index 25e6e467..c9d87ae6 100644 --- a/plugin/mysql/src/mysql_test.go +++ b/plugin/mysql/src/mysql_test.go @@ -503,4 +503,27 @@ func TestRandDataAndCheck(t *testing.T){ t.Log("mysql Table Count:",count," srcDataCount:",len(e.GetDataMap())) t.Log("test over") +} + +func TestCommitBySymbol(t *testing.T){ + + url = "root:@tcp(127.0.0.1:3306)/bifrost_test" + beforeTest() + TableName = "binlog_field_test-3" + conn := getPluginConn() + initDBTable(false) + + e := pluginTestData.NewEvent() + + conn.Insert(e.GetTestInsertData()) + conn.Del(e.GetTestDeleteData()) + conn.Update(e.GetTestUpdateData()) + conn.Insert(e.GetTestInsertData()) + conn.Insert(e.GetTestInsertData()) + conn.Insert(e.GetTestInsertData()) + + _,err2 := conn.Commit() + if err2 != nil{ + log.Fatal(err2) + } } \ No newline at end of file From 29dd1e50728b91171eddfe09f20a02257c8ebe42 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Mon, 10 Feb 2020 20:17:54 +0800 Subject: [PATCH 11/20] v1.1.1-beta.07 --- config/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/version.go b/config/version.go index 51701d7c..73202702 100755 --- a/config/version.go +++ b/config/version.go @@ -15,4 +15,4 @@ limitations under the License. */ package config -const VERSION = "v1.1.1-beta.02" +const VERSION = "v1.1.1-beta.07" From 351244ed85c71d110a0b0b6aab9bb4d09b3d4c12 Mon Sep 17 00:00:00 2001 From: jc3wish Date: Tue, 11 Feb 2020 18:30:14 +0800 Subject: [PATCH 12/20] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dmysql=E5=86=99=E5=85=A5?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E5=90=8D=E6=98=AF=E4=BF=9D=E7=95=99=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E6=8A=A5=E9=94=99=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/mysql/src/mysql.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugin/mysql/src/mysql.go b/plugin/mysql/src/mysql.go index e5db7009..6f2be401 100644 --- a/plugin/mysql/src/mysql.go +++ b/plugin/mysql/src/mysql.go @@ -509,10 +509,10 @@ func (This *Conn) getStmt(Type EventType) dbDriver.Stmt{ values := "" for _,v:= range This.p.Field{ if fields == ""{ - fields = v.ToField + fields = "`"+v.ToField+"`" values = "?" }else{ - fields += ","+v.ToField + fields += ",`"+v.ToField+"`" values += ",?" } } @@ -526,9 +526,9 @@ func (This *Conn) getStmt(Type EventType) dbDriver.Stmt{ where := "" for _,v:= range This.p.PriKey{ if where == ""{ - where = v.ToField+"=?" + where = "`"+v.ToField+"`=?" }else{ - where += " AND "+v.ToField+"=?" + where += " AND `"+v.ToField+"`=?" } } This.p.stmtArr[Type],This.conn.err = This.conn.conn.Prepare("DELETE FROM "+This.p.Datakey+" WHERE "+where) @@ -542,13 +542,13 @@ func (This *Conn) getStmt(Type EventType) dbDriver.Stmt{ fields2 := "" for _,v:= range This.p.Field{ if fields == ""{ - fields = v.ToField + fields = "`"+v.ToField+"`" values = "?" - fields2 = v.ToField+"=?" + fields2 = "`"+v.ToField+"`=?" }else{ - fields += ","+v.ToField + fields += ",`"+v.ToField+"`" values += ",?" - fields2 += ","+v.ToField+"=?" + fields2 += ",`"+v.ToField+"`=?" } } sql := "INSERT INTO "+This.p.Datakey+" ("+fields+") VALUES ("+values+") ON DUPLICATE KEY UPDATE "+fields2 From 7e91d2f1234c3d7f9c95a5f53ac3e21774eb3a45 Mon Sep 17 00:00:00 2001 From: zero Date: Fri, 10 Apr 2020 23:10:50 +0800 Subject: [PATCH 13/20] =?UTF-8?q?char=E5=AD=98=E5=82=A8=E5=AD=97=E8=8A=82?= =?UTF-8?q?=E5=A4=A7=E4=BA=8E255=E8=A7=A3=E6=9E=90=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Bristol/mysql/binlog.go | 14 ++++++++++++-- Bristol/mysql/event_row.go | 30 +++++++++++++++++++++++++++--- Bristol/mysql/lib.go | 1 + Bristol/mysql/version.go | 2 +- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/Bristol/mysql/binlog.go b/Bristol/mysql/binlog.go index 8bac2731..73ef3ba8 100755 --- a/Bristol/mysql/binlog.go +++ b/Bristol/mysql/binlog.go @@ -210,6 +210,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, parser.conn.Close() } errs = fmt.Errorf(fmt.Sprint(err)) + log.Println(err) } }() if parser.connStatus == 0 { @@ -217,7 +218,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, } //set dbAndTable Name tableId parser.tableNameMap[database+"."+tablename] = tableId - sql := "SELECT COLUMN_NAME,COLUMN_KEY,COLUMN_TYPE,CHARACTER_SET_NAME,COLLATION_NAME,NUMERIC_SCALE,EXTRA,COLUMN_DEFAULT,DATA_TYPE FROM information_schema.columns WHERE table_schema='" + database + "' AND table_name='" + tablename + "' ORDER BY `ORDINAL_POSITION` ASC" + sql := "SELECT COLUMN_NAME,COLUMN_KEY,COLUMN_TYPE,CHARACTER_SET_NAME,COLLATION_NAME,NUMERIC_SCALE,EXTRA,COLUMN_DEFAULT,DATA_TYPE,CHARACTER_OCTET_LENGTH FROM information_schema.columns WHERE table_schema='" + database + "' AND table_name='" + tablename + "' ORDER BY `ORDINAL_POSITION` ASC" stmt, err := parser.conn.Prepare(sql) p := make([]driver.Value, 0) rows, err := stmt.Query(p) @@ -231,7 +232,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, ColumnSchemaTypeList: make([]*column_schema_type,0), } for { - dest := make([]driver.Value, 9, 9) + dest := make([]driver.Value, 10, 10) err := rows.Next(dest) if err != nil { break @@ -245,6 +246,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, var enum_values, set_values []string var COLUMN_DEFAULT string var DATA_TYPE string + var CHARACTER_OCTET_LENGTH uint64 COLUMN_NAME = dest[0].(string) COLUMN_KEY = dest[1].(string) @@ -310,6 +312,13 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, } else { set_values = make([]string, 0) } + + if dest[9] == nil{ + CHARACTER_OCTET_LENGTH = 0 + }else{ + CHARACTER_OCTET_LENGTH = dest[9].(uint64) + } + tableInfo.ColumnSchemaTypeList = append(tableInfo.ColumnSchemaTypeList, &column_schema_type{ COLUMN_NAME: COLUMN_NAME, COLUMN_KEY: COLUMN_KEY, @@ -325,6 +334,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string, NUMERIC_SCALE:NUMERIC_SCALE, COLUMN_DEFAULT:COLUMN_DEFAULT, DATA_TYPE:DATA_TYPE, + CHARACTER_OCTET_LENGTH:CHARACTER_OCTET_LENGTH, }) if strings.ToUpper(COLUMN_KEY) == "PRI"{ diff --git a/Bristol/mysql/event_row.go b/Bristol/mysql/event_row.go index 699b2d8a..5ff64102 100755 --- a/Bristol/mysql/event_row.go +++ b/Bristol/mysql/event_row.go @@ -345,10 +345,33 @@ func (parser *eventParser) parseEventRow(buf *bytes.Buffer, tableMap *TableMapEv case FIELD_TYPE_STRING: var length int - var b byte - b, e = buf.ReadByte() - length = int(b) + if tableSchemaMap[i].CHARACTER_OCTET_LENGTH > 255 { + var short uint16 + e = binary.Read(buf, binary.LittleEndian, &short) + length = int(short) + }else{ + var b byte + b, e = buf.ReadByte() + length = int(b) + } + //row[column_name] = string(buf.Next(length+1)) + /* + log.Println("======================") + log.Println("column_name:", column_name," length:",length) + //log.Println("name:",tableMap.columnMetaData[i].name) + log.Println("size:",tableMap.columnMetaData[i].size) + log.Println("precision:",tableMap.columnMetaData[i].precision) + log.Println("max_length:",tableMap.columnMetaData[i].max_length) + log.Println("length_size:",tableMap.columnMetaData[i].length_size) + log.Println("fsp:",tableMap.columnMetaData[i].fsp) + log.Println("decimals:",tableMap.columnMetaData[i].decimals) + log.Println("unsigned:",tableMap.columnMetaData[i].unsigned) + log.Println("column_type:",tableMap.columnMetaData[i].column_type) + log.Println("tableMap.columnMetaData[i]:",tableMap.columnMetaData[i]) + */ row[column_name] = string(buf.Next(length)) + //log.Println("column_name: ",column_name," == ",row[column_name]) + break case FIELD_TYPE_ENUM: @@ -558,6 +581,7 @@ func (parser *eventParser) parseEventRow(buf *bytes.Buffer, tableMap *TableMapEv default: return nil, fmt.Errorf("Unknown FieldType %d", tableMap.columnTypes[i]) } + //log.Println("column_name:",column_name," row[column_name]:",row[column_name]) if e != nil { log.Println("lastField err:",column_name,tableMap.columnMetaData[i].column_type,e) return nil, e diff --git a/Bristol/mysql/lib.go b/Bristol/mysql/lib.go index e4716c74..1d9c73df 100755 --- a/Bristol/mysql/lib.go +++ b/Bristol/mysql/lib.go @@ -26,6 +26,7 @@ type column_schema_type struct { auto_increment bool COLUMN_DEFAULT string DATA_TYPE string + CHARACTER_OCTET_LENGTH uint64 } type MysqlConnection interface { diff --git a/Bristol/mysql/version.go b/Bristol/mysql/version.go index ca50b5a8..547051b8 100644 --- a/Bristol/mysql/version.go +++ b/Bristol/mysql/version.go @@ -1,3 +1,3 @@ package mysql -const VERSION = "v1.1.1" +const VERSION = "v1.1.2-beta.01" From 70718df5ac8feb8c0a935611576e66fdf1a9d375 Mon Sep 17 00:00:00 2001 From: zero Date: Fri, 10 Apr 2020 23:12:32 +0800 Subject: [PATCH 14/20] v1.1.1-beta.08 --- config/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/version.go b/config/version.go index 73202702..00fd1de7 100755 --- a/config/version.go +++ b/config/version.go @@ -15,4 +15,4 @@ limitations under the License. */ package config -const VERSION = "v1.1.1-beta.07" +const VERSION = "v1.1.1-beta.08" From 9443d47990e731b282df3a0159afec5ab3e4b87a Mon Sep 17 00:00:00 2001 From: jc3wish Date: Tue, 14 Apr 2020 23:14:13 +0800 Subject: [PATCH 15/20] =?UTF-8?q?=E6=96=B0=E5=A2=9ENullTransferDefault?= =?UTF-8?q?=E9=85=8D=E7=BD=AE,=E5=8F=AF=E5=BC=BA=E5=88=B6=E5=B0=86NULL?= =?UTF-8?q?=E8=BD=AC=E6=88=90=E5=AF=B9=E5=BA=94=E7=9A=84=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/mysql/src/mysql.go | 61 +++++++++++++++++++++++++++++-------- plugin/mysql/www/doc.html | 45 ++++++++++++++++++++++++++- plugin/mysql/www/mysql.html | 19 +++++++----- plugin/mysql/www/mysql.js | 7 ++++- 4 files changed, 110 insertions(+), 22 deletions(-) diff --git a/plugin/mysql/src/mysql.go b/plugin/mysql/src/mysql.go index 6f2be401..7644981e 100644 --- a/plugin/mysql/src/mysql.go +++ b/plugin/mysql/src/mysql.go @@ -121,6 +121,7 @@ type PluginParam struct { Data *dataTableStruct fieldCount int stmtArr []dbDriver.Stmt + NullTransferDefault bool //是否将null值强制转成相对应类型的默认值 } @@ -339,9 +340,7 @@ func (This *Conn) Commit() (b *pluginDriver.PluginBinlog,e error) { case "update": val := make([]dbDriver.Value,This.p.fieldCount*2) for i,v:=range This.p.Field{ - //toV,This.err = dataTypeTransfer(data.Rows[1][v.FromMysqlField],v.ToField,v.ToFieldType,v.ToFieldDefault) - - toV,This.err = dataTypeTransfer(This.getMySQLData(data,1,v.FromMysqlField), v.ToField,v.ToFieldType,v.ToFieldDefault) + toV,This.err = This.dataTypeTransfer(This.getMySQLData(data,1,v.FromMysqlField), v.ToField,v.ToFieldType,v.ToFieldDefault) if This.err != nil{ return nil,This.err @@ -364,8 +363,7 @@ func (This *Conn) Commit() (b *pluginDriver.PluginBinlog,e error) { case "delete": where := make([]dbDriver.Value,0) for _,v := range This.p.PriKey{ - toV,This.err = dataTypeTransfer(This.getMySQLData(data,0,v.FromMysqlField), v.ToField,v.ToFieldType,v.ToFieldDefault) - //toV,_ = dataTypeTransfer(data.Rows[0][v.FromMysqlField],v.ToField,v.ToFieldType,v.ToFieldDefault) + toV,This.err = This.dataTypeTransfer(This.getMySQLData(data,0,v.FromMysqlField), v.ToField,v.ToFieldType,v.ToFieldDefault) where = append(where,toV) } if checkOpMap(data.Rows[0][This.p.mysqlPriKey], "delete") == false { @@ -384,8 +382,7 @@ func (This *Conn) Commit() (b *pluginDriver.PluginBinlog,e error) { val := make([]dbDriver.Value,0) i:=0 for _,v:=range This.p.Field{ - toV,This.err = dataTypeTransfer(This.getMySQLData(data,0,v.FromMysqlField), v.ToField,v.ToFieldType,v.ToFieldDefault) - //toV,This.err = dataTypeTransfer(data.Rows[0][v.FromMysqlField],v.ToField,v.ToFieldType,v.ToFieldDefault) + toV,This.err = This.dataTypeTransfer(This.getMySQLData(data,0,v.FromMysqlField), v.ToField,v.ToFieldType,v.ToFieldDefault) if This.err != nil{ return nil,This.err } @@ -435,23 +432,61 @@ func (This *Conn) Commit() (b *pluginDriver.PluginBinlog,e error) { return &pluginDriver.PluginBinlog{list[n-1].BinlogFileNum,list[n-1].BinlogPosition}, nil } -func dataTypeTransfer(data interface{},fieldName string,toDataType string,defaultVal *string) (v dbDriver.Value,e error) { +func (This *Conn) dataTypeTransfer(data interface{},fieldName string,toDataType string,defaultVal *string) (v dbDriver.Value,e error) { defer func() { if err := recover();err != nil{ log.Fatal(string(debug.Stack())) e = fmt.Errorf(fieldName+" "+fmt.Sprint(err)) } }() - if data == nil{ - if defaultVal == nil{ - v = nil - return + if data == nil { + if This.p.NullTransferDefault == false{ + if defaultVal == nil{ + v = nil + return + }else{ + data = *defaultVal + } }else{ - data = *defaultVal + //假如配置是强制转成默认值 + switch toDataType { + case "int","tinyint","smallint","mediumint","bigint","bool": + v = "0" + break + case "bit": + v = int64(0) + break + case "date": + v = "0000-00-00" + break + case "timestamp": + v = "0000-00-00 00:00:00" + break + case "datetime": + v = "0000-00-00 00:00:00" + break + case "time": + v = "00:00:00" + break + case "year": + v = "0000" + break + case "float","double","decimal","number","point": + v = "0.00" + break + default: + v = "" + break + } + return } } switch toDataType { case "bool": + if data == nil{ + v = false + break + } switch data.(type) { case bool: if data.(bool) == true{ diff --git a/plugin/mysql/www/doc.html b/plugin/mysql/www/doc.html index c029a147..025447e6 100644 --- a/plugin/mysql/www/doc.html +++ b/plugin/mysql/www/doc.html @@ -100,7 +100,50 @@

NULL 值

-

所有数据类型都支持Null值,假如源表是 Null ,但是目标库有默认值,则同步的时候,目标表以默认值填充,否则为 Null

+

所有数据类型都支持Null值,假如源表是 Null ,但是目标库有默认值,则同步的时候,目标表以默认值填充

+

假如 NullTransferDefault参数配置成了True,即使目标库表中没有默认值,也会强制转成相对应类型的默认值

+ +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
类型默认值
int,bigint,smallinit,mediumint,tinyint,bit0/td> +
datetime,timestamp0000-00-00 00:00:00
date0000-00-00
time00:00:00
year0000
boolfalse
其他空("")
+

 

标签

diff --git a/plugin/mysql/www/mysql.html b/plugin/mysql/www/mysql.html index cf1ed5aa..cb048bdb 100644 --- a/plugin/mysql/www/mysql.html +++ b/plugin/mysql/www/mysql.html @@ -34,12 +34,6 @@ -

 

@@ -52,6 +46,17 @@ * 多少条刷一次数据 + +
+ +
+ + True(NULL会强制转成对应类型的默认值) False(不进行转换) 建议选择为True +
+
@@ -62,7 +67,7 @@