diff --git a/components/devices/class.adapter.js b/components/devices/!class.adapter.js similarity index 100% rename from components/devices/class.adapter.js rename to components/devices/!class.adapter.js diff --git a/components/devices/class.interfaceStream.js b/components/devices/!class.interfaceStream.js similarity index 99% rename from components/devices/class.interfaceStream.js rename to components/devices/!class.interfaceStream.js index 49a8fc1..0ae4c00 100644 --- a/components/devices/class.interfaceStream.js +++ b/components/devices/!class.interfaceStream.js @@ -4,7 +4,7 @@ const { Duplex, finished } = require("stream"); const { interfaceStreams } = require("../../system/shared.js"); const timeout = require("../../helper/timeout"); -const Adapter = require("./class.adapter.js"); +const Adapter = require("./!class.adapter.js"); const kSource = Symbol("source"); // https://www.programmersought.com/article/42661306247/ diff --git a/components/devices/class.device.js b/components/devices/class.device.js index 6ca0400..a374bef 100644 --- a/components/devices/class.device.js +++ b/components/devices/class.device.js @@ -1,11 +1,11 @@ const Joi = require("joi"); const mongodb = require("mongodb"); -const InterfaceStream = require("./class.interfaceStream.js"); +//const InterfaceStream = require("./class.interfaceStream.js"); const Interface = require("./class.interface.js"); const Item = require("../../system/component/class.item.js"); -const mixins = require("../../helper/mixins.js"); +//const mixins = require("../../helper/mixins.js"); //const injectMethod = require("../../helper/injectMethod.js"); //const { parse, calculateChecksum } = require("./net-helper.js"); @@ -41,11 +41,13 @@ module.exports = class Device extends Item { // for each interface class, create a interface stream this.interfaces = props.interfaces.map((obj) => { + return new Interface(obj); // NOTE: refactor interfaceStream in v4 // move .bridge method there and pass device instance? // > Would this also create a ciruclar reference in Interface class // > since its stored via `Object.defineProperty(this, "stream",...);` + /* let stream = new InterfaceStream({ // duplex stream options emitClose: false @@ -72,7 +74,7 @@ module.exports = class Device extends Item { device: this._id }, cb); }); - */ + * // "hide" stream behind iface object @@ -82,6 +84,7 @@ module.exports = class Device extends Item { setPrototype: true, //transparent: false }); + */ }); diff --git a/components/devices/class.interface.js b/components/devices/class.interface.js index cb30fd0..031484d 100644 --- a/components/devices/class.interface.js +++ b/components/devices/class.interface.js @@ -14,6 +14,7 @@ const timeout = require("../../helper/timeout.js"); const promisfy = require("../../helper/promisify.js"); const PENDING_BRIDGES = new Set(); +const WEBSOCKET_SERVER = new Map(); /** * @description @@ -35,11 +36,12 @@ const PENDING_BRIDGES = new Set(); */ module.exports = class Interface { - constructor(obj, stream) { + constructor(obj/*, stream*/) { Object.assign(this, obj); this._id = String(obj._id); + /* // hide stream object on interface Object.defineProperty(this, "stream", { value: stream @@ -51,6 +53,7 @@ module.exports = class Interface { //let { interfaceStreams } = global.sharedObjects; let { interfaceStreams } = require("../../system/shared.js"); interfaceStreams.set(this._id, stream); + */ // hot fix for #350 /* @@ -523,6 +526,14 @@ module.exports = class Interface { // feedback logger.debug(`Bridge closed, destroy everything: iface ${this._id} <-> ${proto}://${host}:${port} (${request.uuid})`); + // TODO: Improve error handling/forwarding/cleanup + // socket.destroy() throws ABORT_ERR after emitting custom connection error + // socket.end() does not throw, but is it enough to cleanup everything? + // does it matter that 2 diffrent errors events are emitted? + // 1) "ECONN*", 2) AbortError after calling socket.destroy() + // The ABORT_ERR is not emitted as error, `// Unhandled 'error' event`... + // on what instance is the error thrown? + // destroy everything socket.destroy(); readable.destroy(); @@ -530,8 +541,20 @@ module.exports = class Interface { }); - writable.pipe(stream); - stream.pipe(readable); + // forward error on WebSocket.createWebSocketStream + // used for syscall errors forwarding from connector + stream.once("error", (...args) => { + socket.emit("error", ...args); + }); + + process.nextTick(() => { + + stream.emit("open"); + + writable.pipe(stream); + stream.pipe(readable); + + }); } }); @@ -613,6 +636,7 @@ module.exports = class Interface { */ static PENDING_BRIDGES = PENDING_BRIDGES; + static WEBSOCKET_SERVER = WEBSOCKET_SERVER; static socket(iface, cb) { return promisfy((done) => { diff --git a/components/devices/index.js b/components/devices/index.js index 92bb8f5..52ed92e 100644 --- a/components/devices/index.js +++ b/components/devices/index.js @@ -18,7 +18,7 @@ const COMPONENT = require("../../system/component/class.component.js"); const Device = require("./class.device.js"); const Interface = require("./class.interface.js"); -const InterfaceStream = require("./class.interfaceStream.js"); +//const InterfaceStream = require("./class.interfaceStream.js"); /** * @description @@ -147,7 +147,7 @@ class C_DEVICES extends COMPONENT { super("devices", Device.schema(), [ Device, Interface, - InterfaceStream + //InterfaceStream ]); diff --git a/components/endpoints/class.command.js b/components/endpoints/class.command.js index bc9aecf..d9eb34a 100644 --- a/components/endpoints/class.command.js +++ b/components/endpoints/class.command.js @@ -97,8 +97,14 @@ module.exports = class Command { this.#privates.set("timeout", Number(process.env.COMMAND_RESPONSE_TIMEOUT)); // set default command handler worker function - this.#privates.set("handler", (cmd, { stream }, params, done) => { + this.#privates.set("handler", (cmd, iface, params, done) => { + let err = new Error("DEFAULT_COMMAND_HANDLER_REMOVED"); + + done(err); + //throw err; + + /* if (!cmd.payload) { done(new Error("NO_PAYLOAD_DEFINED")); return; @@ -139,6 +145,7 @@ module.exports = class Command { } }); + */ }); diff --git a/components/plugins/class.plugin.js b/components/plugins/class.plugin.js index 26ba9ca..657c8eb 100644 --- a/components/plugins/class.plugin.js +++ b/components/plugins/class.plugin.js @@ -4,7 +4,7 @@ const Joi = require("joi"); const mongodb = require("mongodb"); const logger = require("../../system/logger/index.js"); const semver = require("semver"); -const pkg = require("../../package.json"); +//const pkg = require("../../package.json"); const uuid = require("uuid"); const Item = require("../../system/component/class.item.js"); @@ -83,6 +83,60 @@ module.exports = class Plugin extends Item { return Plugin.schema().validate(data); } + static init(data, logger) { + + let init = (dependencies, cb) => { + try { + + // NOTE: Monkey patch ready/abort method to init? + // A plugin could siganlize if its ready or needs to be restarted + /* + let init = new Promise((resolve, reject) => { + init.ready = resolve; + init.abort = reject; + }); + */ + + const granted = dependencies.every((c) => { + if (data.intents.includes(c)) { + + return true; + + } else { + + logger.warn(`Plugin ${data.uuid} (${data.name}) wants to access not registerd intens "${c}"`); + return false; + + } + }); + + if (granted) { + + let components = dependencies.map((name) => { + return require(path.resolve(process.cwd(), `components/${name}`)); + }); + + cb(data, components); + return init; + + } else { + + throw new Error(`Unregisterd intents access approach`); + + } + + } catch (err) { + + logger.error(err, `Plugin could not initalize!`, err.message); + throw err; + + } + }; + + return init; + + } + /** * @function start * Start installed plugin @@ -94,11 +148,13 @@ module.exports = class Plugin extends Item { // feedback logger.debug(`Start plugin "${this.name}"...`); - let json = {}; + //let json = {}; let plugin = path.resolve(process.cwd(), "plugins", this.uuid); - let file = path.resolve(plugin, "package.json"); + //let file = path.resolve(plugin, "package.json"); // 1) check if plugin is compatible + // removed, see #511 + /* try { let content = fs.readFileSync(file); @@ -122,10 +178,12 @@ module.exports = class Plugin extends Item { } } + */ // 2) start plugin if (fs.existsSync(plugin)) { + /* let init = (dependencies, cb) => { try { @@ -136,7 +194,7 @@ module.exports = class Plugin extends Item { init.ready = resolve; init.abort = reject; }); - */ + * const granted = dependencies.every((c) => { if (this.intents.includes(c)) { @@ -173,27 +231,26 @@ module.exports = class Plugin extends Item { } }; - - init[Symbol.for("uuid")] = this.uuid; + */ try { - let returns = require(path.resolve(plugin, "index.js"))(this, this.logger, init); + let init = Plugin.init(this, this.logger); + //init[Symbol.for("uuid")] = this.uuid; - if (!returns) { - return; - } + let returns = require(path.resolve(plugin, "index.js"))(this, this.logger, init); - if (returns[Symbol.for("uuid")] !== this.uuid) { - logger.warn(`Plugin "${this.uuid}" (${this.name}) does not return the init function!`); + if (returns !== init) { throw new Error("Invalid init function returnd!"); } this.started = true; } catch (err) { - logger.error(`Error in plugin "${this.name}": `, err); + + logger.error(err, `Error in plugin "${this.name}": `); throw err; + } } else { diff --git a/components/scenes/class.scene.js b/components/scenes/class.scene.js index 7c0535f..805d172 100644 --- a/components/scenes/class.scene.js +++ b/components/scenes/class.scene.js @@ -129,9 +129,13 @@ module.exports = class Scene extends Item { trigger() { + let { logger } = Scene.scope; + logger.info(`Trigger scene "${this.name}" (${this._id})`); + // fix #507 // stop previous running scene if (this.states.running && this._ac) { + logger.debug(`Abort previously running scene "${this.name}" (${this._id})`); this._ac.abort(); } diff --git a/routes/router.api.devices.js b/routes/router.api.devices.js index 38146b8..919c640 100644 --- a/routes/router.api.devices.js +++ b/routes/router.api.devices.js @@ -1,13 +1,45 @@ const WebSocket = require("ws"); -const { finished } = require("stream"); +//const { finished } = require("stream"); const C_DEVICES = require("../components/devices"); +const { WEBSOCKET_SERVER } = require("../components/devices/class.interface.js"); //const iface_locked = new Map(); // move that to "event bus" //const { interfaceServer, interfaceStreams } = global.sharedObjects; -const { interfaceServer } = require("../system/shared.js"); - +//const { interfaceServer } = require("../system/shared.js"); + + +// map os syscall codes to ws codes +// https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent/code +const ERROR_CODE_MAPPINGS = { + 4001: "ECONNREFUSED", // -111 + 4002: "ECONNRESET", // -104 + 4003: "EADDRINUSE", // -98 + 4004: "EADDRNOTAVAIL", // -99 + 4005: "ETIMEDOUT", // -110 + 4006: "EHOSTUNREACH", // -113 + 4007: "ENETUNREACH", // -101 + 4008: "ENOTFOUND", // -3008 + 4009: "EPERM", // -1 + 4010: "EACCES", // -13 + 4011: "EPIPE", // -32 + 4012: "EINVAL", // -22 + 4013: "ENOENT", // -2 + "ECONNREFUSED": -111, // 4001 + "ECONNRESET": -104, // 4002 + "EADDRINUSE": -98, // 4003 + "EADDRNOTAVAIL": -99, // 4004 + "ETIMEDOUT": -110, // 4005 + "EHOSTUNREACH": -113, // 4006 + "ENETUNREACH": -101, // 4007 + "ENOTFOUND": -3008, // 4008 + "EPERM": -1, // 4009 + "EACCES": -13, // 4010 + "EPIPE": -32, // 4011 + "EINVAL": -22, // 4012 + "ENOENT": -2 // 4013 +}; module.exports = (app, router) => { @@ -45,17 +77,18 @@ module.exports = (app, router) => { // Goal should be: // - to eliminate the need of "shared.js" // - handle in router.get only ws handshake: "wss.handleUpgrade(...)" - if (!interfaceServer.has(req.params._iid)) { + if (!WEBSOCKET_SERVER.has(req.params._iid)) { let wss = new WebSocket.Server({ noServer: true }); - interfaceServer.set(req.params._iid, wss); + WEBSOCKET_SERVER.set(req.params._iid, wss); // listen only once to connectoin event // gets fired every time websocket client hit this url/route wss.on("connection", (ws, req) => { + // TODO: check for pending request, if not peding, terminate ws connection if (req.query?.uuid && req.query?.socket === "true" && req.query?.type === "response") { // new bridge/connector practice @@ -63,9 +96,27 @@ module.exports = (app, router) => { let stream = WebSocket.createWebSocketStream(ws); - ws.once("close", (...args) => { - stream.emit("close", ...args); - stream.destroy(); + ws.once("close", (code) => { + if (code >= 4000 && code <= 4999) { + + // error on connection attempt + // underlaying os trhowed error + // build custom connection error + let err = new Error("Bridging failed"); + err.code = ERROR_CODE_MAPPINGS[code]; + err.errno = ERROR_CODE_MAPPINGS[err.code]; + err.syscall = "connect"; + + stream.emit("error", err); + + } else { + + // no clue why closed, cleanup anyway + // TODO: check code and decide if error or success closing + //stream.emit("close"); // desotroy() emit close event(!|?) + stream.destroy(); + + } }); @@ -81,7 +132,12 @@ module.exports = (app, router) => { // old/legacy connection mechanism // TODO: Remove this in future versions + ws.close(1008, "LEGACY_CONNECTION_NOT_SUPPORTED_ANYMORE"); + + // maybe this is needed later for things like serial port data transmission + // how to forward data between a plugin and serialport, via CLI tool, like "socketize"? + /* let upstream = WebSocket.createWebSocketStream(ws); // Cleanup: https://nodejs.org/dist/latest-v16.x/docs/api/stream.html#streamfinishedstream-options-callback @@ -104,6 +160,7 @@ module.exports = (app, router) => { }); }); + */ } }); @@ -133,7 +190,7 @@ module.exports = (app, router) => { } - let wss = interfaceServer.get(req.params._iid); + let wss = WEBSOCKET_SERVER.get(req.params._iid); wss.handleUpgrade(req, req.socket, req.headers, (ws) => { diff --git a/routes/router.system.connector.js b/routes/router.system.connector.js index 3bc42ca..a4f2db9 100644 --- a/routes/router.system.connector.js +++ b/routes/router.system.connector.js @@ -38,8 +38,16 @@ module.exports = (router) => { if (PENDING_BRIDGES.size > 0 && ws.readyState === WebSocket.OPEN) { PENDING_BRIDGES.forEach((bridge) => { - ws.send(JSON.stringify(bridge)); + ws.send(JSON.stringify(bridge), () => { + + // sometimes it happens that pending bridges are not build + // but message is written to the connector + //let { remotePort, remoteAddress } = req.socket; + //console.log(err || `Writed to connector [${remoteAddress}:${remotePort}]: ${JSON.stringify(bridge)}`); + + }); }); + } });