diff --git a/bin/intake.js b/bin/intake.js index 5aa826e..daa65d2 100755 --- a/bin/intake.js +++ b/bin/intake.js @@ -1,10 +1,13 @@ #!/usr/bin/env node var moment = require('moment'), + net = require('net'), split = require('split'); JSON.stringifyCanonical = require('canonical-json'); +var config = require('../lib/config'); + var insertTemperatureQuery = ` INSERT INTO temperature_data ( measured_at, @@ -61,67 +64,110 @@ function schedulesAreEqual(a, b) { ); } +var updateListener = net.connect(config.updateListener.port, 'localhost', function() { + console.log('Connected to API socket'); + updateListener.connected = true; + updateListener.on('end', function() { + console.log('Disconnected from API socket'); + updateListener = null; + }); +}); + +updateListener.on('error', function(err) { + console.log('Failed to connect to API socket: ' + err.message); + updateListener = null; +}); + require('../lib/db').connect(function(err, db) { if (err) throw err; var lastSchedule = null; + function sendUpdate(obj, next) { + if (updateListener && updateListener.connected) { + var update = Object.assign({ + type : 'update', + }, obj); + delete update.schedule; + updateListener.write(JSON.stringify(update) + "\n"); + } + db.query(insertTemperatureQuery, [ + // measured_at DATETIME NOT NULL + moment.utc(obj.timestamp).toDate(), + // temp_1 MEDIUMINT + Math.round(obj.computed.T1 * 100), + // temp_2 MEDIUMINT + Math.round(obj.computed.T2 * 100), + // temp_3 MEDIUMINT + Math.round(obj.computed.T3 * 100), + // temp_avg MEDIUMINT NOT NULL + Math.round(obj.computed.temperature * 100), + // setpoint MEDIUMINT + obj.setpoint ? Math.round(obj.setpoint * 100) : null + ], function(err) { + console.log('updated'); + next(err); + }); + } + + function sendSchedule(obj, next) { + if (obj.schedule) { + if (!schedulesAreEqual(obj.schedule, lastSchedule)) { + if (updateListener && updateListener.connected) { + var update = { + type : 'schedule', + schedule : obj.schedule, + }; + updateListener.write(JSON.stringify(update) + "\n"); + } + db.query(insertScheduleQuery, [ + // changed_at DATETIME NOT NULL + moment.utc(obj.schedule.now).toDate(), + // schedule_started_at DATETIME + moment.utc(obj.schedule.startedAt).toDate(), + // step_started_at DATETIME + moment.utc(obj.schedule.stepStartedAt).toDate(), + // steps_json VARCHAR(1000) + JSON.stringifyCanonical(obj.schedule.steps) + ], function(err) { + console.log('updated schedule'); + next(err); + }); + } + } else { + next(null); + } + } + process.stdin.pipe(split()) .on('data', function(line) { if (!line.trim()) return; + var obj = null; try { - var obj = JSON.parse(line); - if ( - !obj.timestamp || - !obj.computed.T1 || - !obj.computed.T2 || - !obj.computed.T3 || - !obj.computed.temperature || - !('setpoint' in obj) - ) { - throw new Error('invalid object data'); - } - process.stdin.pause(); - db.query(insertTemperatureQuery, [ - // measured_at DATETIME NOT NULL - moment.utc(obj.timestamp).toDate(), - // temp_1 MEDIUMINT - Math.round(obj.computed.T1 * 100), - // temp_2 MEDIUMINT - Math.round(obj.computed.T2 * 100), - // temp_3 MEDIUMINT - Math.round(obj.computed.T3 * 100), - // temp_avg MEDIUMINT NOT NULL - Math.round(obj.computed.temperature * 100), - // setpoint MEDIUMINT - obj.setpoint ? Math.round(obj.setpoint * 100) : null - ], function(err) { - process.stdin.resume(); - if (err) throw err; - console.log('updated'); - }); + obj = JSON.parse(line); } catch (err) { console.error('Invalid object: ' + err.message); } - if (obj.schedule) { - if (!schedulesAreEqual(obj.schedule, lastSchedule)) { - process.stdin.pause(); - db.query(insertScheduleQuery, [ - // changed_at DATETIME NOT NULL - moment.utc(obj.schedule.now).toDate(), - // schedule_started_at DATETIME - moment.utc(obj.schedule.startedAt).toDate(), - // step_started_at DATETIME - moment.utc(obj.schedule.stepStartedAt).toDate(), - // steps_json VARCHAR(1000) - JSON.stringifyCanonical(obj.schedule.steps) - ], function(err) { - process.stdin.resume(); + if ( + !obj.timestamp || + !obj.computed.T1 || + !obj.computed.T2 || + !obj.computed.T3 || + !obj.computed.temperature || + !('setpoint' in obj) + ) { + console.error('invalid object data'); + obj = null; + } + if (obj) { + process.stdin.pause(); + sendUpdate(obj, function(err) { + if (err) throw err; + sendSchedule(obj, function(err) { if (err) throw err; - console.log('updated schedule'); - lastSchedule = obj.schedule; + process.stdin.resume(); }); - } + }); } }) .on('end', function() { diff --git a/package.json b/package.json index 75d17a8..3e04845 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "js-yaml" : "^3.6.1", "moment" : "^2.13.0", "mysql" : "^2.10.2", + "sockjs" : "^0.3.17", "split" : "^1.0.0", "suspend" : "^0.7.0" }, diff --git a/server.js b/server.js index e2f3d02..c444344 100644 --- a/server.js +++ b/server.js @@ -1,12 +1,19 @@ const cors = require('cors'); const express = require('express'); +const http = require('http'); const moment = require('moment'); +const net = require('net'); +const sockjs = require('sockjs'); +const split = require('split'); const config = require('./lib/config'); const TemperatureDataStore = require('./lib/TemperatureDataStore'); const app = express(); const apiRouter = express.Router(); +const server = http.createServer(app); + +let clientsToUpdate = []; require('./lib/db').connect((err, db) => { if (err) throw err; @@ -16,6 +23,35 @@ require('./lib/db').connect((err, db) => { console.log(`TemperatureDataStore: ${msg}`); }); + const updateListener = net.createServer(client => { + client.pipe(split()) + .on('data', function(line) { + if (!line.trim()) return; + clientsToUpdate.forEach(c => c.write(line)); + }); + }); + + updateListener.listen(config.updateListener.port, 'localhost', () => { + console.log('Listening for updates on :' + config.updateListener.port); + }); + + // TODO: upgrade to Apache 2.4 - http://stackoverflow.com/q/27526281/106302 + const updateServer = sockjs.createServer({ + sockjs_url : 'http://cdn.jsdelivr.net/sockjs/1.1.1/sockjs.min.js', + websocket : false, + }); + + updateServer.on('connection', function(conn) { + clientsToUpdate.push(conn); + conn.on('close', function() { + clientsToUpdate = clientsToUpdate.filter(c => c !== conn); + }); + }); + + updateServer.installHandlers(server, { + prefix : (config.http.basePath || '') + '/sockjs' + }); + apiRouter.get('/data', (req, res) => { const min = +req.query.min || +moment.utc().subtract(2, 'days'); const max = +req.query.max || +moment.utc(); @@ -71,12 +107,14 @@ require('./lib/db').connect((err, db) => { app.use(cors()); app.use(config.http.basePath || '/', apiRouter); - app.listen(config.http.port, () => { - console.log('Listening on :' + config.http.port); + server.listen(config.http.port, () => { + console.log('Listening for requests on :' + config.http.port); }); process.on('SIGINT', () => { console.log(); + updateListener.close(); + console.log('Update listener closed'); db.end(err => { if (err) throw err; console.log('MySQL connection closed');