Skip to content

Commit

Permalink
Merge branch 'master' of github.com:qiniu/logkit into shangmin02
Browse files Browse the repository at this point in the history
  • Loading branch information
shangmin-001 committed May 28, 2021
2 parents 178d6e0 + f3eebb9 commit eb01537
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 9 deletions.
29 changes: 29 additions & 0 deletions cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cleaner

import (
"path/filepath"
"runtime/debug"
"sync/atomic"
"time"

"github.com/bmatcuk/doublestar"
Expand All @@ -23,6 +25,7 @@ type Cleaner struct {
cleanChan chan<- CleanSignal
name string
logdir string
status int32
}

type CleanSignal struct {
Expand Down Expand Up @@ -91,10 +94,19 @@ func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSign
cleanChan: cleanChan,
name: name,
logdir: logdir,
status: config.StatusInit,
}, nil
}

func (c *Cleaner) Run() {
if !atomic.CompareAndSwapInt32(&c.status, config.StatusInit, config.StatusRunning) {
if c.hasStopped() {
log.Warnf("cleaner[%v] has stopped, run operation ignored", c.name)
} else {
log.Warnf("cleaner[%v] has already running, run operation ignored", c.name)
}
return
}
for {
select {
case <-c.exitChan:
Expand All @@ -110,9 +122,17 @@ func (c *Cleaner) Run() {
}

func (c *Cleaner) Close() {
if !atomic.CompareAndSwapInt32(&c.status, config.StatusRunning, config.StatusStopped) {
log.Warnf("cleaner[%v] is not running, close operation ignored", c.name)
return
}
c.exitChan <- struct{}{}
}

func (c *Cleaner) hasStopped() bool {
return atomic.LoadInt32(&c.status) == config.StatusStopped
}

func (c *Cleaner) Name() string {
return c.name
}
Expand Down Expand Up @@ -160,6 +180,15 @@ func (c *Cleaner) checkBelong(path string) bool {
}

func (c *Cleaner) Clean() (err error) {
defer func() {
if rec := recover(); rec != nil {
log.Errorf("cleaner %q was panicked and recovered from %v\nstack: %s", c.Name(), rec, debug.Stack())
}
}()
if c.hasStopped() {
log.Warnf("cleaner[%v] reader %s has stopped, skip clean operation", c.name)
return
}
var size int64 = 0
var count int64 = 0
beginClean := false
Expand Down
13 changes: 12 additions & 1 deletion sender/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ var ModeKeyOptions = map[string][]Option{
KeyName: KeyElasticVersion,
ChooseOnly: true,
Default: ElasticVersion5,
ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6, ElasticVersion7},
Description: "ES版本号(es_version)",
},
{
Expand All @@ -882,6 +882,17 @@ var ModeKeyOptions = map[string][]Option{
DefaultNoUse: true,
Description: "索引类型名称(elastic_type)",
},
{
KeyName: KeyElasticIDField,
ChooseOnly: false,
Advance: false,
Default: "",
Required: false,
Placeholder: "id",
DefaultNoUse: true,
Description: "索引id字段名(elastic_id_field)",
ToolTip: "默认随机生成id,使用数据中该字段的值作为id,若字段不存在则由es随机生成",
},
OptionAuthUsername,
OptionAuthPassword,
{
Expand Down
13 changes: 8 additions & 5 deletions sender/config/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const (
KeyElasticAlias = "elastic_keys"
KeyElasticIndexStrategy = "elastic_index_strategy"
KeyElasticTimezone = "elastic_time_zone"
KeyElasticIDField = "elastic_id_field"

KeyDefaultIndexStrategy = "default"
KeyYearIndexStrategy = "year"
Expand All @@ -130,6 +131,8 @@ const (
ElasticVersion5 = "5.x"
// ElasticVersion6 v6.x
ElasticVersion6 = "6.x"
// ElasticVersion7 v7.x
ElasticVersion7 = "7.x"

//timeZone
KeylocalTimezone = "Local"
Expand Down Expand Up @@ -208,11 +211,11 @@ const (
KeyKafkaCompressionSnappy = "snappy"
KeyKafkaCompressionLZ4 = "lz4"

KeyKafkaHost = "kafka_host" //主机地址,可以有多个
KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
KeyKafkaClientId = "kafka_client_id" //客户端ID
KeySaslUsername = "kafka_sasl_username" //SASL用户名
KeySaslPassword = "kafka_sasl_password" //SASL密码
KeyKafkaHost = "kafka_host" //主机地址,可以有多个
KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
KeyKafkaClientId = "kafka_client_id" //客户端ID
KeySaslUsername = "kafka_sasl_username" //SASL用户名
KeySaslPassword = "kafka_sasl_password" //SASL密码
//KeyKafkaFlushNum = "kafka_flush_num" //缓冲条数
//KeyKafkaFlushFrequency = "kafka_flush_frequency" //缓冲频率
KeyKafkaRetryMax = "kafka_retry_max" //最大重试次数
Expand Down
120 changes: 117 additions & 3 deletions sender/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
elasticV6 "github.com/olivere/elastic"
elasticV3 "gopkg.in/olivere/elastic.v3"
elasticV5 "gopkg.in/olivere/elastic.v5"
elasticV7 "gopkg.in/olivere/elastic.v7"

"github.com/qiniu/log"
"github.com/qiniu/pandora-go-sdk/base/reqerr"
Expand All @@ -31,11 +32,13 @@ type Sender struct {
host []string
retention int
indexName string
idField string
eType string
eVersion string
elasticV3Client *elasticV3.Client
elasticV5Client *elasticV5.Client
elasticV6Client *elasticV6.Client
elasticV7Client *elasticV7.Client

aliasFields map[string]string

Expand Down Expand Up @@ -73,6 +76,7 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
return
}
logkitSendTime, _ := conf.GetBoolOr(KeyLogkitSendTime, true)
idField, _ := conf.GetStringOr(KeyElasticIDField, "")
eType, _ := conf.GetStringOr(KeyElasticType, defaultType)
name, _ := conf.GetStringOr(KeyName, fmt.Sprintf("elasticSender:(elasticUrl:%s,index:%s,type:%s)", host, index, eType))
fields, _ := conf.GetAliasMapOr(KeyElasticAlias, make(map[string]string))
Expand All @@ -93,7 +97,24 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
var elasticV3Client *elasticV3.Client
var elasticV5Client *elasticV5.Client
var elasticV6Client *elasticV6.Client
var elasticV7Client *elasticV7.Client
switch eVersion {
case ElasticVersion7:
optFns := []elasticV7.ClientOptionFunc{
elasticV7.SetSniff(false),
elasticV7.SetHealthcheck(false),
elasticV7.SetURL(host...),
elasticV7.SetGzip(enableGzip),
}

if len(authUsername) > 0 && len(authPassword) > 0 {
optFns = append(optFns, elasticV7.SetBasicAuth(authUsername, authPassword))
}

elasticV7Client, err = elasticV7.NewClient(optFns...)
if err != nil {
return nil, err
}
case ElasticVersion6:
optFns := []elasticV6.ClientOptionFunc{
elasticV6.SetSniff(false),
Expand Down Expand Up @@ -152,10 +173,12 @@ func NewSender(conf conf.MapConf) (elasticSender sender.Sender, err error) {
name: name,
host: host,
indexName: index,
idField: idField,
eVersion: eVersion,
elasticV3Client: elasticV3Client,
elasticV5Client: elasticV5Client,
elasticV6Client: elasticV6Client,
elasticV7Client: elasticV7Client,
eType: eType,
aliasFields: fields,
intervalIndex: i,
Expand Down Expand Up @@ -184,6 +207,78 @@ func (s *Sender) Name() string {
// Send ElasticSearchSender
func (s *Sender) Send(datas []Data) error {
switch s.eVersion {
case ElasticVersion7:
bulkService := s.elasticV7Client.Bulk()

makeDoc := true
if len(s.aliasFields) == 0 {
makeDoc = false
}
var indexName string
for _, doc := range datas {
//计算索引
indexName = buildIndexName(s.indexName, s.timeZone, s.intervalIndex)
//字段名称替换
if makeDoc {
doc = s.wrapDoc(doc)
}
//添加发送时间
if s.logkitSendTime {
doc[KeySendTime] = time.Now().In(s.timeZone).UnixNano() / 1000000
}
doc2 := doc

request := elasticV7.NewBulkIndexRequest().UseEasyJSON(true).Index(indexName).Type(s.eType).Doc(&doc2)
id, ok := doc[s.idField].(string)
if ok && id != "" {
request.Id(id)
}
bulkService.Add(request)
}

resp, err := bulkService.Do(context.Background())
if err != nil {
return err
}

var (
// 查找出失败的操作并回溯对应的数据返回给上层
lastFailedResult *elasticV7.BulkResponseItem
failedDatas = make([]map[string]interface{}, len(datas))
failedDatasIdx = 0
)
for i, item := range resp.Items {
for _, result := range item {
if !(result.Status >= 200 && result.Status <= 299) {
failedDatas[failedDatasIdx] = datas[i]
failedDatasIdx++
lastFailedResult = result
break // 任一情况的失败都算该条数据整体操作失败,没有必要重复检查
}
}
}
failedDatas = failedDatas[:failedDatasIdx]
if len(failedDatas) == 0 {
return nil
}
lastError, err := jsoniter.MarshalToString(lastFailedResult)
if err != nil {
lastError = fmt.Sprintf("marshal to string failed: %v", lastFailedResult)
}

return &StatsError{
StatsInfo: StatsInfo{
Success: int64(len(datas) - len(failedDatas)),
Errors: int64(len(failedDatas)),
LastError: lastError,
},
SendError: reqerr.NewSendError(
fmt.Sprintf("bulk failed with last error: %s", lastError),
failedDatas,
reqerr.TypeBinaryUnpack,
),
}

case ElasticVersion6:
bulkService := s.elasticV6Client.Bulk()

Expand All @@ -204,7 +299,13 @@ func (s *Sender) Send(datas []Data) error {
doc[KeySendTime] = time.Now().In(s.timeZone).UnixNano() / 1000000
}
doc2 := doc
bulkService.Add(elasticV6.NewBulkIndexRequest().UseEasyJSON(true).Index(indexName).Type(s.eType).Doc(&doc2))

request := elasticV6.NewBulkIndexRequest().UseEasyJSON(true).Index(indexName).Type(s.eType).Doc(&doc2)
id, ok := doc[s.idField].(string)
if ok && id != "" {
request.Id(id)
}
bulkService.Add(request)
}

resp, err := bulkService.Do(context.Background())
Expand Down Expand Up @@ -270,7 +371,12 @@ func (s *Sender) Send(datas []Data) error {
doc[KeySendTime] = curTime
}
doc2 := doc
bulkService.Add(elasticV5.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2))
request := elasticV5.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2)
id, ok := doc[s.idField].(string)
if ok && id != "" {
request.Id(id)
}
bulkService.Add(request)
}

resp, err := bulkService.Do(context.Background())
Expand Down Expand Up @@ -335,7 +441,12 @@ func (s *Sender) Send(datas []Data) error {
doc[KeySendTime] = time.Now().In(s.timeZone).UnixNano() / 1000000
}
doc2 := doc
bulkService.Add(elasticV3.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2))
request := elasticV3.NewBulkIndexRequest().Index(indexName).Type(s.eType).Doc(&doc2)
id, ok := doc[s.idField].(string)
if ok && id != "" {
request.Id(id)
}
bulkService.Add(request)
}

resp, err := bulkService.Do()
Expand Down Expand Up @@ -409,6 +520,9 @@ func (s *Sender) Close() error {
if s.elasticV6Client != nil {
s.elasticV6Client.Stop()
}
if s.elasticV7Client != nil {
s.elasticV7Client.Stop()
}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions transforms/mutate/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (
type Replacer struct {
StageTime string `json:"stage"`
Key string `json:"key"`
OldKey string `json:"old"` // 兼容老版本
NewKey string `json:"new"`
Old string `json:"old_string"`
New string `json:"new_string"`
Regex bool `json:"regex"`
Expand All @@ -29,6 +31,12 @@ type Replacer struct {
}

func (g *Replacer) Init() error {
if g.Old == "" {
g.Old = g.OldKey
}
if g.New == "" {
g.New = g.NewKey
}
rgexpr := g.Old
if !g.Regex {
rgexpr = regexp.QuoteMeta(g.Old)
Expand Down
4 changes: 4 additions & 0 deletions transforms/mutate/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ type Spliter struct {
Key string `json:"key"`
SeparateKey string `json:"sep"`
ArrayName string `json:"new"`
ArrayNameNew string `json:"newfield"` // 兼容老版本

stats StatsInfo
keys []string
numRoutine int
}

func (g *Spliter) Init() error {
if g.ArrayName == "" {
g.ArrayName = g.ArrayNameNew
}
g.keys = GetKeys(g.Key)

numRoutine := MaxProcs
Expand Down

0 comments on commit eb01537

Please sign in to comment.