-
Notifications
You must be signed in to change notification settings - Fork 4
/
log-reader.coffee
226 lines (190 loc) · 7.16 KB
/
log-reader.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
module.exports = (env) ->
# ##Dependencies
# * from node.js
util = require 'util'
# * pimatic imports.
Promise = env.require 'bluebird'
assert = env.require 'cassert'
_ = env.require 'lodash'
M = env.matcher
Tail = env.Tail or require('tail').Tail
t = env.require('decl-api').types
LineByLineReader = require("line-by-line")
# ##The LogReaderPlugin
class LogReaderPlugin extends env.plugins.Plugin
init: (app, @framework, @config) ->
@framework.ruleManager.addPredicateProvider new LogWatcherPredicateProvider(@framework)
deviceConfigDef = require("./device-config-schema")
@framework.deviceManager.registerDeviceClass("LogWatcher", {
configDef: deviceConfigDef.LogWatcher,
createCallback: (config, lastState) => return new LogWatcher(config, lastState)
})
plugin = new LogReaderPlugin
# ##LogWatcher Sensor
class LogWatcher extends env.devices.Sensor
constructor: (@config, lastState) ->
@id = @config.id
@name = @config.name
if @config.template?
@template = @config.template
@attributeValue = {}
@changedAttributeValue = {}
@attributes = {}
# initialise all attributes
for attr, i in @config.attributes
do (attr) =>
# legacy support
if typeof attr is "string"
attr = {
name: attr
type: "string"
}
@config.attributes[i] = attr
name = attr.name
assert attr.name?
assert attr.type?
lastValue = lastState?[name]?.value
unless typeof lastValue is attr.type
lastValue = null
switch attr.type
when "string"
# that the value to 'unknown'
@attributeValue[name] = lastValue
# Get all possible values
possibleValues = _.map(_.filter(@config.lines, (l) => l[name]?), (l) => l[name])
# Add attribute definition
@attributes[name] =
description: name
type: t.string
enum: possibleValues
when "number"
@attributeValue[name] = lastValue
@attributes[name] =
description: name
type: t.number
@attributes[name].unit = attr.unit if attr.unit?
when "boolean"
@attributeValue[name] = lastValue
@attributes[name] =
description: name
type: t.boolean
else
throw new Error("Illegal type: #{attr.type} for attributes #{name} in LogWatcher.")
for property in ['label', 'acronym', 'discrete']
@attributes[name][property] = attr[property] if attr[property]?
if _.isArray attr.labels
@attributes[name].labels = attr.labels
# Create a getter for this attribute
@_createGetter name, ( => Promise.resolve @attributeValue[name] )
onLine = (data) =>
# check all lines in config
for line in @config.lines
# for a match.
matches = new RegExp(line.match).exec(data)
if matches?
# If a match occures then emit a "match"-event.
@emit 'match', line, data, matches
return
@_tailing = no
onMatch = (line, data, matches) =>
# then check for each prop in the config
for attr in @config.attributes
# if the attr is registered for the log line.
if attr.name of line
# When a value for the attr is defined, then set the value
# and emit the event.
valueToSet = line[attr.name]
value = null
if attr.type is "boolean"
value = line[attr.name]
else
matchesRegexValue = valueToSet.match(/\$(\d+)/)
if matchesRegexValue?
value = matches[parseInt(matchesRegexValue[1], 10)]
else
value = line[attr.name]
if attr.type is "number" then value = parseFloat(value)
if @_tailing
@attributeValue[attr.name] = value
@emit(attr.name, value)
else
if @attributeValue[attr.name] isnt value
@attributeValue[attr.name] = value
@changedAttributeValue[attr.name] = value
return
# When a match event occurs
@on 'match', onMatch
# read the file to get initial values:
retryTimeout = 0
reader = () =>
retryTimeout += 10000 if retryTimeout <= 60000
@lr.removeAllListeners() if @lr?
@lr = new LineByLineReader(@config.file)
@lr.on "error", (err) ->
env.logger.error err.message
env.logger.debug err.stack
setTimeout reader, retryTimeout
@lr.on "open", => retryTimeout = 0
@lr.on "line", onLine
@lr.on "end", =>
@_tailing = yes
for attrName, value of @changedAttributeValue
@emit(attrName, value)
@changedAttributeValue = {}
# If we have read the full file then tail the file
@tail = new Tail(@config.file)
# On every new line in the log file
@tail.on 'line', onLine
@tail.on 'error', (errMessage) =>
# Tail passes an error message string
errMessage = errMessage.replace ': undefined', ''
if @tail?
@tail.unwatch()
@tail.removeAllListeners()
@lr.emit "error", new Error(errMessage)
reader()
super()
destroy: () ->
@removeAllListeners 'match'
@lr.close() if @lr?
@tail.unwatch() if @tail?
super()
class LogWatcherPredicateProvider extends env.predicates.PredicateProvider
listener: []
constructor: (@framework) ->
parsePredicate: (input, context) ->
for id, d of @framework.deviceManager.devices
if d instanceof LogWatcher
info = @_getLineWithPredicate d.config, input, context
if info?
return {
token: info.token
nextInput: input.substring(info.token.length)
predicateHandler: new LogWatcherPredicateHandler(this, d, info.line)
}
return null
_getLineWithPredicate: (config, input, context) ->
for line in config.lines
if line.predicate?
m = M(input, context).match(line.predicate)
if m.hadMatch()
match = m.getFullMatch()
return {line, token: match, nextInput: input.substring(match.length)}
return null
class LogWatcherPredicateHandler extends env.predicates.PredicateHandler
constructor: (@provider, @device, @line) ->
setup: ->
@deviceListener = (line, data) =>
if @device._tailing and line.match is @line.match then @emit('change', 'event')
@device.addListener 'match', @deviceListener
super()
getValue: -> Promise.resolve(false)
destroy: ->
@device.removeListener 'match', @deviceListener
super()
getType: -> 'event'
# For testing...
@LogReaderPlugin = LogReaderPlugin
@LogWatcherPredicateProvider = LogWatcherPredicateProvider
# Export the plugin.
return plugin