From 7dfd04266a8aed5255c9419166c1971a1779e959 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Fri, 16 Nov 2018 19:09:56 +0800 Subject: [PATCH 01/13] Update sync.go --- river/sync.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/river/sync.go b/river/sync.go index 3b3854ee..8965e8f4 100644 --- a/river/sync.go +++ b/river/sync.go @@ -514,6 +514,15 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat)) } + } else { + if col.Type == schema.TYPE_STRING { + col.Type = schema.TYPE_DATETIME + v := r.makeReqColumnData(col, value) + str, _ := v.(string) + stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) + t := int64(stamp.Unix()) + fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) + } } } From 7350dff007cacb8b31543bbb489c081d5110e624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Tue, 20 Nov 2018 11:30:16 +0800 Subject: [PATCH 02/13] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4992d48b..b2d4e7c5 100644 --- a/README.md +++ b/README.md @@ -203,7 +203,7 @@ Although there are some other MySQL rivers for Elasticsearch, like [elasticsearc ## Todo + MySQL 8 -+ ES 6 ++ ES 6 (After verification (version 6.4.2), it is now supported. Delete and update are supported) + Statistic. ## Donate From c31ee199f2f28702c9b0d800e35d616bfd09422b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Tue, 20 Nov 2018 11:56:42 +0800 Subject: [PATCH 03/13] Update river.toml --- etc/river.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/etc/river.toml b/etc/river.toml index a5db6b7a..d69da4fe 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -105,6 +105,8 @@ id="es_id" tags="es_tags,list" # Map column `keywords` to ES with array type keywords=",list" +# Map column (mysql type varchar(255) "2018-11-20 10:10:10") `time` to ES with date type +time="time,date" # Filter rule # From 99e1dd0930bab454d4f4c9b6e72456725bfadaa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 21 Nov 2018 15:58:58 +0800 Subject: [PATCH 04/13] Update sync.go --- river/sync.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 8965e8f4..885b39f4 100644 --- a/river/sync.go +++ b/river/sync.go @@ -519,7 +519,10 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i col.Type = schema.TYPE_DATETIME v := r.makeReqColumnData(col, value) str, _ := v.(string) - stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) + + //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) + // time.RFC3339 解析的时间格式是 2018-10-30T01:45:00Z + stamp, _ := time.ParseInLocation(time.RFC3339, str, time.Local) t := int64(stamp.Unix()) fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) } From d2c9fb4f0c604de4e1e0927dd046e5152905f2f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 21 Nov 2018 18:24:57 +0800 Subject: [PATCH 05/13] Update sync.go --- river/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 885b39f4..4b4604c7 100644 --- a/river/sync.go +++ b/river/sync.go @@ -521,7 +521,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i str, _ := v.(string) //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) - // time.RFC3339 解析的时间格式是 2018-10-30T01:45:00Z + // time.RFC3339 time style is 2018-10-30T01:45:00Z stamp, _ := time.ParseInLocation(time.RFC3339, str, time.Local) t := int64(stamp.Unix()) fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) From a23070da213b109c3e574b7692e0cb213d661172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 21 Nov 2018 18:52:31 +0800 Subject: [PATCH 06/13] Update river.toml --- etc/river.toml | 136 +++++++++++++++++++------------------------------ 1 file changed, 51 insertions(+), 85 deletions(-) diff --git a/etc/river.toml b/etc/river.toml index d69da4fe..cc14b97d 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -1,17 +1,17 @@ # MySQL address, user and password # user must have replication privilege in MySQL. -my_addr = "127.0.0.1:3306" +my_addr = "mysqlhost:3306" my_user = "root" -my_pass = "" +my_pass = "root" my_charset = "utf8" # Set true when elasticsearch use https #es_https = false # Elasticsearch address -es_addr = "127.0.0.1:9200" +es_addr = "eshost:9200" # Elasticsearch user and password, maybe set by shield, nginx, or x-pack -es_user = "" -es_pass = "" +#es_user = "" +#es_pass = "" # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. @@ -22,10 +22,10 @@ data_dir = "./var" stat_addr = "127.0.0.1:12800" # pseudo server id like a slave -server_id = 1001 +server_id = 123454 # mysql or mariadb -flavor = "mysql" +flavor = "mariadb" # mysqldump execution path # if not set or empty, ignore mysqldump. @@ -46,13 +46,13 @@ skip_no_pk_table = false # MySQL data source [[source]] -schema = "test" +schema = "nfvofcaps" # Only below tables will be synced into Elasticsearch. # "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 # I don't think it is necessary to sync all tables in a database. -tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] - +#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] +tables = ["FMALARM"] # Below is for special rule mapping # Very simple example @@ -67,84 +67,50 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] # # The table `t` will be synced to ES index `test` and type `t`. [[rule]] -schema = "test" -table = "t" -index = "test" -type = "t" - -# Wildcard table rule, the wildcard table must be in source tables -# All tables which match the wildcard format will be synced to ES index `test` and type `t`. -# In this example, all tables must have same schema with above table `t`; -[[rule]] -schema = "test" -table = "t_[0-9]{4}" -index = "test" -type = "t" - -# Simple field rule -# -# desc tfield; -# +----------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +----------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | tags | varchar(256) | YES | | NULL | | -# | keywords | varchar(256) | YES | | NULL | | -# +----------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tfield" -index = "test" -type = "tfield" +schema = "nfvofcaps" +table = "FMALARM" +index = "nfvomysql" +type = "mysqltable" + +# The es doc's id will be `id`:`tag` +# It is useful for merge muliple table into one type while theses tables have same PK +id = ["ID","ALARMID"] + [rule.field] # Map column `id` to ES field `es_id` -id="es_id" +#id="es_id" # Map column `tags` to ES field `es_tags` with array type -tags="es_tags,list" +#tags="es_tags,list" # Map column `keywords` to ES with array type -keywords=",list" -# Map column (mysql type varchar(255) "2018-11-20 10:10:10") `time` to ES with date type -time="time,date" +#keywords=",list" + + +ORIGIN="origin" +ID="id" +ALARMID="alarmid" +ALARMTITLE="alarmtitle" +ALARMSTATUS="alarmstatus" +ORIGSEVERITY="origseverity" +ALARMTYPE="alarmtype" +EVENTTIME="eventtime,date" +MSGSEQ="msgseq" +CLEARTIME="cleartime,date" +CLEARFLAG="clearflag" +CLEARMSGSEQ="clearmsgseq" +SPECIFICPROBLEMID="specificproblemid" +SPECIFICPROBLEM="specificproblem" +NEUID="neuid" +NENAME="nename" +NETYPE="netype" +OBJECTUID="objectuid" +OBJECTNAME="objectname" +OBJECTTYPE="objecttype" +LOCATIONINFO="locationinfo" +ADDINFO="addinfo" +PVFLAG="pvflag" +CONFIRMFLAG="confirmflag" +CONFIRMTIME="confirmtime,date" +REMARK="remark" +REMARKTIME="remarktime,date" -# Filter rule -# -# desc tfilter; -# +-------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +-------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | c1 | int(11) | YES | | 0 | | -# | c2 | int(11) | YES | | 0 | | -# | name | varchar(256) | YES | | NULL | | -# +-------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tfilter" -index = "test" -type = "tfilter" - -# Only sync following columns -filter = ["id", "name"] - -# id rule -# -# desc tid_[0-9]{4}; -# +----------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +----------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | tag | varchar(256) | YES | | NULL | | -# | desc | varchar(256) | YES | | NULL | | -# +----------+--------------+------+-----+---------+-------+ -# -[[rule]] -schema = "test" -table = "tid_[0-9]{4}" -index = "test" -type = "t" -# The es doc's id will be `id`:`tag` -# It is useful for merge muliple table into one type while theses tables have same PK -id = ["id", "tag"] From df184c69b9354d2b1b8540af374d2a5f469f100d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 21 Nov 2018 19:12:00 +0800 Subject: [PATCH 07/13] Update sync.go --- river/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 4b4604c7..1829cccf 100644 --- a/river/sync.go +++ b/river/sync.go @@ -522,7 +522,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) // time.RFC3339 time style is 2018-10-30T01:45:00Z - stamp, _ := time.ParseInLocation(time.RFC3339, str, time.Local) + stamp, _ := time.Parse(time.RFC3339, str) t := int64(stamp.Unix()) fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) } From 7fd71572cc56f584549be0af9ff6b9abd6025c91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 21 Nov 2018 19:27:56 +0800 Subject: [PATCH 08/13] Update sync.go --- river/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 1829cccf..b56bddd0 100644 --- a/river/sync.go +++ b/river/sync.go @@ -522,7 +522,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) // time.RFC3339 time style is 2018-10-30T01:45:00Z - stamp, _ := time.Parse(time.RFC3339, str) + stamp, _ := time.ParseInLocation("2006-01-02T15:04:05Z", str, time.Local) t := int64(stamp.Unix()) fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) } From 94620e3debe5dad22014f2e77e6884f438c1f4f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 21 Nov 2018 19:45:19 +0800 Subject: [PATCH 09/13] Update sync.go --- river/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index b56bddd0..a307f882 100644 --- a/river/sync.go +++ b/river/sync.go @@ -520,7 +520,7 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i v := r.makeReqColumnData(col, value) str, _ := v.(string) - //stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) + stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) // time.RFC3339 time style is 2018-10-30T01:45:00Z stamp, _ := time.ParseInLocation("2006-01-02T15:04:05Z", str, time.Local) t := int64(stamp.Unix()) From 1e4a7ab619ddc55bb826f3b64594ae9bf32081d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Thu, 22 Nov 2018 00:14:09 +0800 Subject: [PATCH 10/13] Update sync.go --- river/sync.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/river/sync.go b/river/sync.go index a307f882..66325abd 100644 --- a/river/sync.go +++ b/river/sync.go @@ -515,18 +515,15 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat)) } } else { - if col.Type == schema.TYPE_STRING { - col.Type = schema.TYPE_DATETIME - v := r.makeReqColumnData(col, value) + if col.Type == schema.TYPE_STRING { + v := r.makeReqColumnData(col, value) str, _ := v.(string) - - stamp, _ := time.ParseInLocation("2006-01-02 03:04:05", str, time.Local) - // time.RFC3339 time style is 2018-10-30T01:45:00Z - stamp, _ := time.ParseInLocation("2006-01-02T15:04:05Z", str, time.Local) - t := int64(stamp.Unix()) - fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) - } - } + stamp, _ := time.ParseInLocation(time.RFC3339 , str, time.Local) + t := int64(stamp.Unix()) + col.Type = schema.TYPE_DATETIME + fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) + } + } } if fieldValue == nil { From c2db5d70cfb7fe1a675ffa3fbc551936e203e64f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Thu, 22 Nov 2018 00:15:08 +0800 Subject: [PATCH 11/13] Update river.toml --- etc/river.toml | 110 +++++-------------------------------------------- 1 file changed, 10 insertions(+), 100 deletions(-) diff --git a/etc/river.toml b/etc/river.toml index cc14b97d..bdb8e7db 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -1,116 +1,26 @@ -# MySQL address, user and password -# user must have replication privilege in MySQL. my_addr = "mysqlhost:3306" my_user = "root" my_pass = "root" my_charset = "utf8" - -# Set true when elasticsearch use https -#es_https = false -# Elasticsearch address es_addr = "eshost:9200" -# Elasticsearch user and password, maybe set by shield, nginx, or x-pack -#es_user = "" -#es_pass = "" - -# Path to store data, like master.info, if not set or empty, -# we must use this to support breakpoint resume syncing. -# TODO: support other storage, like etcd. data_dir = "./var" - -# Inner Http status address stat_addr = "127.0.0.1:12800" - -# pseudo server id like a slave -server_id = 123454 - -# mysql or mariadb -flavor = "mariadb" - -# mysqldump execution path -# if not set or empty, ignore mysqldump. +server_id = 1234 +flavor = "mysql" mysqldump = "mysqldump" - -# if we have no privilege to use mysqldump with --master-data, -# we must skip it. -#skip_master_data = false - -# minimal items to be inserted in one bulk bulk_size = 128 - -# force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "200ms" - -# Ignore table without primary key skip_no_pk_table = false - -# MySQL data source [[source]] -schema = "nfvofcaps" - -# Only below tables will be synced into Elasticsearch. -# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 -# I don't think it is necessary to sync all tables in a database. -#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] -tables = ["FMALARM"] -# Below is for special rule mapping - -# Very simple example -# -# desc t; -# +-------+--------------+------+-----+---------+-------+ -# | Field | Type | Null | Key | Default | Extra | -# +-------+--------------+------+-----+---------+-------+ -# | id | int(11) | NO | PRI | NULL | | -# | name | varchar(256) | YES | | NULL | | -# +-------+--------------+------+-----+---------+-------+ -# -# The table `t` will be synced to ES index `test` and type `t`. +schema = "RUNOOB" +tables = ["runoob_tbl"] [[rule]] -schema = "nfvofcaps" -table = "FMALARM" -index = "nfvomysql" -type = "mysqltable" - -# The es doc's id will be `id`:`tag` -# It is useful for merge muliple table into one type while theses tables have same PK -id = ["ID","ALARMID"] +schema = "RUNOOB" +table = "runoob_tbl" +index = "gomysql110" +type = "go" +id = ["runoob_id"] [rule.field] -# Map column `id` to ES field `es_id` -#id="es_id" -# Map column `tags` to ES field `es_tags` with array type -#tags="es_tags,list" -# Map column `keywords` to ES with array type -#keywords=",list" - - -ORIGIN="origin" -ID="id" -ALARMID="alarmid" -ALARMTITLE="alarmtitle" -ALARMSTATUS="alarmstatus" -ORIGSEVERITY="origseverity" -ALARMTYPE="alarmtype" -EVENTTIME="eventtime,date" -MSGSEQ="msgseq" -CLEARTIME="cleartime,date" -CLEARFLAG="clearflag" -CLEARMSGSEQ="clearmsgseq" -SPECIFICPROBLEMID="specificproblemid" -SPECIFICPROBLEM="specificproblem" -NEUID="neuid" -NENAME="nename" -NETYPE="netype" -OBJECTUID="objectuid" -OBJECTNAME="objectname" -OBJECTTYPE="objecttype" -LOCATIONINFO="locationinfo" -ADDINFO="addinfo" -PVFLAG="pvflag" -CONFIRMFLAG="confirmflag" -CONFIRMTIME="confirmtime,date" -REMARK="remark" -REMARKTIME="remarktime,date" - +runoob_title="runoob_title,date" From d8f6d36047a8760e8abfd097b5630991fab67ba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Thu, 22 Nov 2018 00:25:51 +0800 Subject: [PATCH 12/13] Update river.toml --- etc/river.toml | 109 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 99 insertions(+), 10 deletions(-) diff --git a/etc/river.toml b/etc/river.toml index bdb8e7db..0817690d 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -1,26 +1,115 @@ +# MySQL address, user and password +# user must have replication privilege in MySQL. my_addr = "mysqlhost:3306" my_user = "root" my_pass = "root" my_charset = "utf8" + +# Set true when elasticsearch use https +#es_https = false +# Elasticsearch address es_addr = "eshost:9200" +# Elasticsearch user and password, maybe set by shield, nginx, or x-pack +#es_user = "" +#es_pass = "" + +# Path to store data, like master.info, if not set or empty, +# we must use this to support breakpoint resume syncing. +# TODO: support other storage, like etcd. data_dir = "./var" + +# Inner Http status address stat_addr = "127.0.0.1:12800" -server_id = 1234 -flavor = "mysql" + +# pseudo server id like a slave +server_id = 1001 + +# mysql or mariadb +flavor = "mariadb" + +# mysqldump execution path +# if not set or empty, ignore mysqldump. mysqldump = "mysqldump" + +# if we have no privilege to use mysqldump with --master-data, +# we must skip it. +#skip_master_data = false + +# minimal items to be inserted in one bulk bulk_size = 128 + +# force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "200ms" + +# Ignore table without primary key skip_no_pk_table = false + +# MySQL data source [[source]] -schema = "RUNOOB" -tables = ["runoob_tbl"] +schema = "nfvofcaps" + +# Only below tables will be synced into Elasticsearch. +# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 +# I don't think it is necessary to sync all tables in a database. +#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] +tables = ["FMALARM"] +# Below is for special rule mapping + +# Very simple example +# +# desc t; +# +-------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +-------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | name | varchar(256) | YES | | NULL | | +# +-------+--------------+------+-----+---------+-------+ +# +# The table `t` will be synced to ES index `test` and type `t`. [[rule]] -schema = "RUNOOB" -table = "runoob_tbl" -index = "gomysql110" -type = "go" -id = ["runoob_id"] +schema = "nfvofcaps" +table = "FMALARM" +index = "nfvomysql" +type = "mysqltable" + +# The es doc's id will be `id`:`tag` +# It is useful for merge muliple table into one type while theses tables have same PK +id = ["ID","ALARMID"] [rule.field] -runoob_title="runoob_title,date" +# Map column `id` to ES field `es_id` +#id="es_id" +# Map column `tags` to ES field `es_tags` with array type +#tags="es_tags,list" +# Map column `keywords` to ES with array type +#keywords=",list" + + +ORIGIN="origin" +ID="id" +ALARMID="alarmid" +ALARMTITLE="alarmtitle" +ALARMSTATUS="alarmstatus" +ORIGSEVERITY="origseverity" +ALARMTYPE="alarmtype" +EVENTTIME="eventtime,date" +MSGSEQ="msgseq" +CLEARTIME="cleartime,date" +CLEARFLAG="clearflag" +CLEARMSGSEQ="clearmsgseq" +SPECIFICPROBLEMID="specificproblemid" +SPECIFICPROBLEM="specificproblem" +NEUID="neuid" +NENAME="nename" +NETYPE="netype" +OBJECTUID="objectuid" +OBJECTNAME="objectname" +OBJECTTYPE="objecttype" +LOCATIONINFO="locationinfo" +ADDINFO="addinfo" +PVFLAG="pvflag" +CONFIRMFLAG="confirmflag" +CONFIRMTIME="confirmtime,date" +REMARK="remark" +REMARKTIME="remarktime,date" From 3086cd2c971c903238a9a1753cbcb260031ac543 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=A4=BE=E8=8B=B1?= Date: Wed, 9 Jan 2019 17:12:35 +0800 Subject: [PATCH 13/13] Update sync.go --- river/sync.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/river/sync.go b/river/sync.go index 66325abd..d0c57bfc 100644 --- a/river/sync.go +++ b/river/sync.go @@ -514,15 +514,14 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat)) } - } else { - if col.Type == schema.TYPE_STRING { + } else if col.Type == schema.TYPE_STRING { v := r.makeReqColumnData(col, value) str, _ := v.(string) stamp, _ := time.ParseInLocation(time.RFC3339 , str, time.Local) t := int64(stamp.Unix()) col.Type = schema.TYPE_DATETIME fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat)) - } + } }