-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathscheduler.js
129 lines (102 loc) · 3.13 KB
/
scheduler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
const mqtt = require('mqtt');
const waterfall = require('async').waterfall;
const each = require('async').each;
const fs = require('fs');
const path = require('path');
const forever = require('async').forever;
const datejs = require('date.js');
module.exports = class Scheduler {
constructor(opts, cb){
if(cb) this.cb;
//Save options passed for potential future reference
this.opts = opts;
//Handle loading of files
this.filesLoaded = false;
this.scheduledTasks = [];
this.scheduleFile = this.opts.scheduleFile;
delete this.opts.scheduleFile;
this._loadFile();
//Handle MQTT host data
if(!this.opts.host) throw new Error("Missing required MQTT host server")
this.host = this.opts.host;
//Handle optional close events.
this.onMessage = this.opts.onMessage ? this.opts.onMessage : function(){};
delete this.opts.onMessage;
this.onConnect = this.opts.onConnect ? this.opts.onConnect : function(){};
delete this.opts.onConnect;
this.onError = this.opts.onError ? this.opts.onError : function(error){ throw new Error(error) };
delete this.opts.onError;
this.onClose = this.opts.onClose ? this.opts.onClose : function(){};
delete this.opts.onClose;
this.onTaskFire = this.opts.onTaskFire ? this.opts.onTaskFire : function(){};
delete this.opts.onTaskFire;
//Prepare connection
this.connected = false;
this.client = mqtt.connect(this.opts);
//Attach events
var self = this;
this.client.on('connect', function(){
self._onConnect();
});
this.client.on('message', this.onMessage);
this.client.on("error", this.onError);
this.client.on("close", this.onClose);
this.client.on("task-fire", this.onTaskFire);
//Setup forever checker
this.checker = forever(done =>{
this._checkForTasks(done);
}, ()=>{});
}
_loadFile(){
var self = this;
fs.readFile(self.scheduleFile, (err, data)=>{
console.log(data.toString());
data = data.toString();
//Parse the data
var lines = data.split("\n");
lines.forEach((line)=>{
var parsedTask = line.split("\t");
if(parsedTask.length != 4) return;
self.scheduledTasks.push({
name: parsedTask[0],
interval: parsedTask[1],
topic: parsedTask[2],
payload: parsedTask[3].replace(/[\r\n]/g, ''),
nextFire: datejs(parsedTask[1])
});
});
});
}
_onConnect(){
// First, call our custom on connect
this.onConnect();
this.connected = true;
}
_handleCallback(err){
if(err && this.cb && !this._callbackTriggered){
this._callbackTriggered = true;
this.cb(err);
} else if(err){
this._callbackTriggered = true;
throw new Error(err);
} else if(this.cb && this.connected && this.filesLoaded){
this._callbackTriggered = true;
this.cb();
}
}
_checkForTasks(done){
if(!this.scheduledTasks || this.scheduledTasks.length == 0) return setTimeout(done, 10);
var now = new Date()
this.scheduledTasks.forEach((task, index)=>{
if(task.nextFire <= now){
this.scheduledTasks[index].nextFire = datejs(task.interval);
this._handleTask(task);
}
});
setTimeout(done, 10);
}
_handleTask(task){
this.onTaskFire(task);
this.client.publish(task.topic, task.payload);
}
};