Skip to content

Commit

Permalink
Merge pull request #512 from mStirner/dev
Browse files Browse the repository at this point in the history
Working on issues
  • Loading branch information
mStirner authored Dec 5, 2024
2 parents 719ab87 + f58dfba commit cf51414
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 33 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { Duplex, finished } = require("stream");
const { interfaceStreams } = require("../../system/shared.js");

const timeout = require("../../helper/timeout");
const Adapter = require("./class.adapter.js");
const Adapter = require("./!class.adapter.js");
const kSource = Symbol("source");

// https://www.programmersought.com/article/42661306247/
Expand Down
9 changes: 6 additions & 3 deletions components/devices/class.device.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
const Joi = require("joi");
const mongodb = require("mongodb");

const InterfaceStream = require("./class.interfaceStream.js");
//const InterfaceStream = require("./class.interfaceStream.js");
const Interface = require("./class.interface.js");
const Item = require("../../system/component/class.item.js");

const mixins = require("../../helper/mixins.js");
//const mixins = require("../../helper/mixins.js");
//const injectMethod = require("../../helper/injectMethod.js");

//const { parse, calculateChecksum } = require("./net-helper.js");
Expand Down Expand Up @@ -41,11 +41,13 @@ module.exports = class Device extends Item {
// for each interface class, create a interface stream
this.interfaces = props.interfaces.map((obj) => {

return new Interface(obj);

// NOTE: refactor interfaceStream in v4
// move .bridge method there and pass device instance?
// > Would this also create a ciruclar reference in Interface class
// > since its stored via `Object.defineProperty(this, "stream",...);`
/*
let stream = new InterfaceStream({
// duplex stream options
emitClose: false
Expand All @@ -72,7 +74,7 @@ module.exports = class Device extends Item {
device: this._id
}, cb);
});
*/
*
// "hide" stream behind iface object
Expand All @@ -82,6 +84,7 @@ module.exports = class Device extends Item {
setPrototype: true,
//transparent: false
});
*/

});

Expand Down
30 changes: 27 additions & 3 deletions components/devices/class.interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const timeout = require("../../helper/timeout.js");
const promisfy = require("../../helper/promisify.js");

const PENDING_BRIDGES = new Set();
const WEBSOCKET_SERVER = new Map();

/**
* @description
Expand All @@ -35,11 +36,12 @@ const PENDING_BRIDGES = new Set();
*/
module.exports = class Interface {

constructor(obj, stream) {
constructor(obj/*, stream*/) {

Object.assign(this, obj);
this._id = String(obj._id);

/*
// hide stream object on interface
Object.defineProperty(this, "stream", {
value: stream
Expand All @@ -51,6 +53,7 @@ module.exports = class Interface {
//let { interfaceStreams } = global.sharedObjects;
let { interfaceStreams } = require("../../system/shared.js");
interfaceStreams.set(this._id, stream);
*/

// hot fix for #350
/*
Expand Down Expand Up @@ -523,15 +526,35 @@ module.exports = class Interface {
// feedback
logger.debug(`Bridge closed, destroy everything: iface ${this._id} <-> ${proto}://${host}:${port} (${request.uuid})`);

// TODO: Improve error handling/forwarding/cleanup
// socket.destroy() throws ABORT_ERR after emitting custom connection error
// socket.end() does not throw, but is it enough to cleanup everything?
// does it matter that 2 diffrent errors events are emitted?
// 1) "ECONN*", 2) AbortError after calling socket.destroy()
// The ABORT_ERR is not emitted as error, `// Unhandled 'error' event`...
// on what instance is the error thrown?

// destroy everything
socket.destroy();
readable.destroy();
writable.destroy();

});

writable.pipe(stream);
stream.pipe(readable);
// forward error on WebSocket.createWebSocketStream
// used for syscall errors forwarding from connector
stream.once("error", (...args) => {
socket.emit("error", ...args);
});

process.nextTick(() => {

stream.emit("open");

writable.pipe(stream);
stream.pipe(readable);

});

}
});
Expand Down Expand Up @@ -613,6 +636,7 @@ module.exports = class Interface {
*/

static PENDING_BRIDGES = PENDING_BRIDGES;
static WEBSOCKET_SERVER = WEBSOCKET_SERVER;

static socket(iface, cb) {
return promisfy((done) => {
Expand Down
4 changes: 2 additions & 2 deletions components/devices/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const COMPONENT = require("../../system/component/class.component.js");

const Device = require("./class.device.js");
const Interface = require("./class.interface.js");
const InterfaceStream = require("./class.interfaceStream.js");
//const InterfaceStream = require("./class.interfaceStream.js");

/**
* @description
Expand Down Expand Up @@ -147,7 +147,7 @@ class C_DEVICES extends COMPONENT {
super("devices", Device.schema(), [
Device,
Interface,
InterfaceStream
//InterfaceStream
]);


Expand Down
9 changes: 8 additions & 1 deletion components/endpoints/class.command.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ module.exports = class Command {
this.#privates.set("timeout", Number(process.env.COMMAND_RESPONSE_TIMEOUT));

// set default command handler worker function
this.#privates.set("handler", (cmd, { stream }, params, done) => {
this.#privates.set("handler", (cmd, iface, params, done) => {

let err = new Error("DEFAULT_COMMAND_HANDLER_REMOVED");

done(err);
//throw err;

/*
if (!cmd.payload) {
done(new Error("NO_PAYLOAD_DEFINED"));
return;
Expand Down Expand Up @@ -139,6 +145,7 @@ module.exports = class Command {
}
});
*/

});

Expand Down
83 changes: 70 additions & 13 deletions components/plugins/class.plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const Joi = require("joi");
const mongodb = require("mongodb");
const logger = require("../../system/logger/index.js");
const semver = require("semver");
const pkg = require("../../package.json");
//const pkg = require("../../package.json");
const uuid = require("uuid");

const Item = require("../../system/component/class.item.js");
Expand Down Expand Up @@ -83,6 +83,60 @@ module.exports = class Plugin extends Item {
return Plugin.schema().validate(data);
}

static init(data, logger) {

let init = (dependencies, cb) => {
try {

// NOTE: Monkey patch ready/abort method to init?
// A plugin could siganlize if its ready or needs to be restarted
/*
let init = new Promise((resolve, reject) => {
init.ready = resolve;
init.abort = reject;
});
*/

const granted = dependencies.every((c) => {
if (data.intents.includes(c)) {

return true;

} else {

logger.warn(`Plugin ${data.uuid} (${data.name}) wants to access not registerd intens "${c}"`);
return false;

}
});

if (granted) {

let components = dependencies.map((name) => {
return require(path.resolve(process.cwd(), `components/${name}`));
});

cb(data, components);
return init;

} else {

throw new Error(`Unregisterd intents access approach`);

}

} catch (err) {

logger.error(err, `Plugin could not initalize!`, err.message);
throw err;

}
};

return init;

}

/**
* @function start
* Start installed plugin
Expand All @@ -94,11 +148,13 @@ module.exports = class Plugin extends Item {
// feedback
logger.debug(`Start plugin "${this.name}"...`);

let json = {};
//let json = {};
let plugin = path.resolve(process.cwd(), "plugins", this.uuid);
let file = path.resolve(plugin, "package.json");
//let file = path.resolve(plugin, "package.json");

// 1) check if plugin is compatible
// removed, see #511
/*
try {
let content = fs.readFileSync(file);
Expand All @@ -122,10 +178,12 @@ module.exports = class Plugin extends Item {
}
}
*/

// 2) start plugin
if (fs.existsSync(plugin)) {

/*
let init = (dependencies, cb) => {
try {
Expand All @@ -136,7 +194,7 @@ module.exports = class Plugin extends Item {
init.ready = resolve;
init.abort = reject;
});
*/
*
const granted = dependencies.every((c) => {
if (this.intents.includes(c)) {
Expand Down Expand Up @@ -173,27 +231,26 @@ module.exports = class Plugin extends Item {
}
};

init[Symbol.for("uuid")] = this.uuid;
*/

try {

let returns = require(path.resolve(plugin, "index.js"))(this, this.logger, init);
let init = Plugin.init(this, this.logger);
//init[Symbol.for("uuid")] = this.uuid;

if (!returns) {
return;
}
let returns = require(path.resolve(plugin, "index.js"))(this, this.logger, init);

if (returns[Symbol.for("uuid")] !== this.uuid) {
logger.warn(`Plugin "${this.uuid}" (${this.name}) does not return the init function!`);
if (returns !== init) {
throw new Error("Invalid init function returnd!");
}

this.started = true;

} catch (err) {
logger.error(`Error in plugin "${this.name}": `, err);

logger.error(err, `Error in plugin "${this.name}": `);
throw err;

}

} else {
Expand Down
4 changes: 4 additions & 0 deletions components/scenes/class.scene.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,13 @@ module.exports = class Scene extends Item {

trigger() {

let { logger } = Scene.scope;
logger.info(`Trigger scene "${this.name}" (${this._id})`);

// fix #507
// stop previous running scene
if (this.states.running && this._ac) {
logger.debug(`Abort previously running scene "${this.name}" (${this._id})`);
this._ac.abort();
}

Expand Down
Loading

0 comments on commit cf51414

Please sign in to comment.