Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql时间字符串(varchar "2018-11-20 10:10:10")支持转为ES的date类型 #305

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ Although there are some other MySQL rivers for Elasticsearch, like [elasticsearc
## Todo

+ MySQL 8
+ ES 6
+ ES 6 (After verification (version 6.4.2), it is now supported. Delete and update are supported)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, are you sure we can support ES 6 directly? Seem ES 6 has already removed doc Type?

+ Statistic.

## Donate
Expand Down
133 changes: 50 additions & 83 deletions etc/river.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_addr = "mysqlhost:3306"
my_user = "root"
my_pass = ""
my_pass = "root"
my_charset = "utf8"

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "127.0.0.1:9200"
es_addr = "eshost:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""
#es_user = ""
#es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
Expand All @@ -25,7 +25,7 @@ stat_addr = "127.0.0.1:12800"
server_id = 1001

# mysql or mariadb
flavor = "mysql"
flavor = "mariadb"

# mysqldump execution path
# if not set or empty, ignore mysqldump.
Expand All @@ -46,13 +46,13 @@ skip_no_pk_table = false

# MySQL data source
[[source]]
schema = "test"
schema = "nfvofcaps"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
# I don't think it is necessary to sync all tables in a database.
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]

#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
tables = ["FMALARM"]
# Below is for special rule mapping

# Very simple example
Expand All @@ -67,82 +67,49 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
#
# The table `t` will be synced to ES index `test` and type `t`.
[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"

# Wildcard table rule, the wildcard table must be in source tables
# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
# In this example, all tables must have same schema with above table `t`;
[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"

# Simple field rule
#
# desc tfield;
# +----------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | tags | varchar(256) | YES | | NULL | |
# | keywords | varchar(256) | YES | | NULL | |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfield"
index = "test"
type = "tfield"
schema = "nfvofcaps"
table = "FMALARM"
index = "nfvomysql"
type = "mysqltable"

# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK
id = ["ID","ALARMID"]


[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
#id="es_id"
# Map column `tags` to ES field `es_tags` with array type
tags="es_tags,list"
#tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

# Filter rule
#
# desc tfilter;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | c1 | int(11) | YES | | 0 | |
# | c2 | int(11) | YES | | 0 | |
# | name | varchar(256) | YES | | NULL | |
# +-------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfilter"
index = "test"
type = "tfilter"

# Only sync following columns
filter = ["id", "name"]

# id rule
#
# desc tid_[0-9]{4};
# +----------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | tag | varchar(256) | YES | | NULL | |
# | desc | varchar(256) | YES | | NULL | |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tid_[0-9]{4}"
index = "test"
type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK
id = ["id", "tag"]
#keywords=",list"


ORIGIN="origin"
ID="id"
ALARMID="alarmid"
ALARMTITLE="alarmtitle"
ALARMSTATUS="alarmstatus"
ORIGSEVERITY="origseverity"
ALARMTYPE="alarmtype"
EVENTTIME="eventtime,date"
MSGSEQ="msgseq"
CLEARTIME="cleartime,date"
CLEARFLAG="clearflag"
CLEARMSGSEQ="clearmsgseq"
SPECIFICPROBLEMID="specificproblemid"
SPECIFICPROBLEM="specificproblem"
NEUID="neuid"
NENAME="nename"
NETYPE="netype"
OBJECTUID="objectuid"
OBJECTNAME="objectname"
OBJECTTYPE="objecttype"
LOCATIONINFO="locationinfo"
ADDINFO="addinfo"
PVFLAG="pvflag"
CONFIRMFLAG="confirmflag"
CONFIRMTIME="confirmtime,date"
REMARK="remark"
REMARKTIME="remarktime,date"
10 changes: 9 additions & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,15 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
}
}
} else if col.Type == schema.TYPE_STRING {
v := r.makeReqColumnData(col, value)
str, _ := v.(string)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch v.(type) {
    case string:
    case []byte:
    default:
}

I think you can check string and []byte here.

stamp, _ := time.ParseInLocation(time.RFC3339 , str, time.Local)
t := int64(stamp.Unix())
col.Type = schema.TYPE_DATETIME
fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))

}
}

if fieldValue == nil {
Expand Down