Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create a https agent for interfaces/requests #340

Closed
mStirner opened this issue Oct 2, 2023 · 5 comments
Closed

create a https agent for interfaces/requests #340

mStirner opened this issue Oct 2, 2023 · 5 comments

Comments

@mStirner
Copy link
Member

mStirner commented Oct 2, 2023

    httpsAgent(options = {}) {

        let agent = new Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        //let settings = this.settings;


        agent.createConnection = ({ host = null, port = null }) => {

            console.log(`############## Create connection to tcp://${host}:${port}`);


            let readable = new PassThrough();
            let writable = new PassThrough();


            // TODO Implement "auto-drain" when no upstream is attached -> Move this "lower", e.g. before ws upstream?
            /*
            let writable = new Transform({
                transform(chunk, enc, cb) {

                    debugger;

                    //console.log("this.stream",);
                    console.error(">>>> Write data, flowing?", str.upstream ? true : false, settings.host);

                    if (str.upstream) {
                        this.push(chunk);
                    } else {
                        while (this.read() !== null) {
                            // do nothing with writen input data
                            // empty readable queue
                        }
                    }

                    cb();

                }
            });
            */


            let stream = new Duplex.from({
                readable,
                writable
            });

            stream.destroy = (...args) => {
                console.log("socket.destroy();", args);
            };

            stream.ref = (...args) => {
                console.log("socket.unref();", args);
            };

            stream.unref = (...args) => {
                console.log("socket.unref();", args);
            };

            stream.setKeepAlive = (...args) => {
                console.log("socket.setKeepAlive()", args);
            };

            stream.setTimeout = (...args) => {
                console.log("socket.setTimeout();", args);
            };

            stream.setNoDelay = (...args) => {
                console.log("socket.setNotDelay();", args);
            };

            this.stream.pipe(readable, { end: false });
            writable.pipe(this.stream, { end: false });

            return tls.connect({
                socket: stream,
                host: this.settings.host,
                ...options
            });

        };

        return agent;

    }
@mStirner
Copy link
Member Author

Dunno why, but this attempt did not work:

Code
  _createAgentStream() {

      // cleanup, could be possible be piped from previous "connections"
      this.stream.unpipe();

      /*
      // check if passed host/port matches interface settings?
      if (host != settings.host || port != settings.port) {

          let msg = "host/port for interface missmatch, expected:\r\n";
          msg += `\thost = ${host}; got = ${settings.host}\r\n`;
          msg += `\tport = ${settings.port}; got = ${settings.port}`;

          throw new Error(msg);

      }
      */

      //let readable = new PassThrough();
      //let writable = new PassThrough();

      let readable = new Transform({
          transform(chunk, enc, cb) {

              //console.log("[incoming]", chunk.toString());

              // temp fix for #343
              // this is not the prefered fix for this issue
              // it should be handled on "stream/socket" level instead
              // the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
              // NOTE: what if the body contains json that has a `connection: close` property/key/value?
              chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");

              this.push(chunk);
              cb();

          }
      });

      let writable = new Transform({
          transform(chunk, enc, cb) {

              //console.log("[outgoing]", chunk.toString());
              this.push(chunk);
              cb();

          }
      });


      // TODO Implement "auto-drain" when no upstream is attached -> Move this "lower", e.g. before ws upstream?
      /*
      let writable = new Transform({
          transform(chunk, enc, cb) {

              debugger;

              //console.log("this.stream",);
              console.error(">>>> Write data, flowing?", str.upstream ? true : false, settings.host);

              if (str.upstream) {
                  this.push(chunk);
              } else {
                  while (this.read() !== null) {
                      // do nothing with writen input data
                      // empty readable queue
                  }
              }

              cb();

          }
      });
      */


      let stream = new Duplex.from({
          readable,
          writable
      });

      stream.destroy = (...args) => {
          console.log("socket.destroy();", args);
      };

      stream.ref = (...args) => {
          console.log("socket.unref();", args);
      };

      stream.unref = (...args) => {
          console.log("socket.unref();", args);
      };

      stream.setKeepAlive = (...args) => {
          console.log("socket.setKeepAlive()", args);
      };

      stream.setTimeout = (...args) => {
          console.log("socket.setTimeout();", args);
      };

      stream.setNoDelay = (...args) => {
          console.log("socket.setNotDelay();", args);
      };

      this.stream.pipe(readable, { end: false });
      writable.pipe(this.stream, { end: false });

      return stream;

  }


  // NEW VERSION, fix for #329
  httpAgent(options = {}) {

      let agent = new http.Agent({
          keepAlive: true,
          maxSockets: 1,
          ...options
      });

      //let settings = this.settings;


      agent.createConnection = ({ host = null, port = null }) => {

          console.log(`############## Create connection to tcp://${host}:${port}`);
          return this._createAgentStream();

      };

      return agent;

  }

  httpsAgent(options = {}) {

      let agent = new https.Agent({
          keepAlive: true,
          maxSockets: 1,
          ...options
      });

      agent.createConnection = ({ host = null, port = null }) => {

          console.log(`############## Create connection to tcp://${host}:${port}`);
          let socket = this._createAgentStream();

          return tls.connect({
              socket,
              host: this.settings.host,
              ...options
          });

      };

      return agent;

  }

@mStirner
Copy link
Member Author

After overriding the http header "connection" here:

let readable = new Transform({
transform(chunk, enc, cb) {
//console.log("[incoming]", chunk.toString());
// temp fix for #343
// this is not the prefered fix for this issue
// it should be handled on "stream/socket" level instead
// the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
// NOTE: what if the body contains json that has a `connection: close` property/key/value?
chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");
this.push(chunk);
cb();
}
});
, this breaks the tls encryption:

Error: write EPROTO 140016587843520:error:1417110F:SSL routines:tls_process_server_hello:bad length:../deps/openssl/openssl/ssl/statem/statem_clnt.c:1485:

    at WriteWrap.onWriteComplete [as oncomplete] (node:internal/stream_base_commons:94:16)
    at Duplexify.ondata (node:internal/js_stream_socket:77:22)
    at Duplexify.emit (node:events:513:28)
    at addChunk (node:internal/streams/readable:315:12)
    at readableAddChunk (node:internal/streams/readable:289:9)
    at Duplexify.Readable.push (node:internal/streams/readable:228:10)
    at d._read (node:internal/streams/duplexify:350:16)
    at Transform.<anonymous> (node:internal/streams/duplexify:333:9)
    at Transform.emit (node:events:513:28)
    at emitReadable_ (node:internal/streams/readable:578:12) {
  errno: -71,
  code: 'EPROTO',
  syscall: 'write'
}

Could be resolved when no common "createStream" function is used, and in the tls one the manipulating stuff is ommitd.
But then its possible that other stuff breaks and the interfaceStream is closed/destroyed.

@mStirner
Copy link
Member Author

mStirner commented Dec 18, 2023

Works, but have concerns about the keep alive & socket destroying functionality.

    _agentConnection({ protocol = "http:" }) {

        // cleanup, could be possible be piped from previous "connections"
        this.stream.unpipe();

        let readable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[incoming]", chunk.toString());

                // temp fix for #343
                // this is not the prefered fix for this issue
                // it should be handled on "stream/socket" level instead
                // the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
                // NOTE: what if the body contains json that has a `connection: close` property/key/value?
                if (protocol !== "https:") {
                    chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");
                }

                this.push(chunk);
                cb();

            }
        });

        let writable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[outgoing]", chunk.toString());

                this.push(chunk);
                cb();

            }
        });

        let stream = new Duplex.from({
            readable,
            writable
        });

        [
            "destroy", "ref", "unref",
            "setKeepAlive", "setTimeout", "setNoDelay"
        ].forEach((fnc) => {

            // fake methods above
            // dont need any of these
            stream[fnc] = () => { };

        });

        this.stream.pipe(readable, { end: false });
        writable.pipe(this.stream, { end: false });

        return stream;

    }


    // NEW VERSION, fix for #329
    httpAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new http.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {
            return this._agentConnection(...args);
        };

        this.cachedAgent = agent;
        return agent;

    }


    httpsAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new https.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {

            let { host, port } = this.settings;
            let socket = this._agentConnection(...args);

            return tls.connect({
                socket,
                host,
                port,
                ...options
            });

        };

        this.cachedAgent = agent;
        return agent;

    }

full code/backup (delete uncomited file in dev branch):

const Joi = require("joi");
const http = require("http");
const https = require("https");
const tls = require("tls");
const mongodb = require("mongodb");
const { Transform, Duplex } = require("stream");

/**
 * @description
 * Implements a interface item, that hides a duplex stream in it, to read/write data from a device interface
 * 
 * @class Interface
 * 
 * @param {Object} obj Object that matches the item schema. See properties below:
 * @param {InterfaceStream} stream Instance of a InterfaceStream object
 * 
 * @property {String} _id MongoDB Object id is as string
 * @property {String} type Type of the interface, `SERIAL` or `ETHERNET`
 * @property {Object} settings Interface specifiy type settings.
 * @property {Array} [adapter=["raw"]] Array of adapter to use for encoding/decoding data: `base64`, `eiscp`, `json`, `raw`
 * 
 * @see interfaceStream components/devices/class.interfaceStream.js
 * @link https://github.com/OpenHausIO/backend/blob/dev/components/devices/class.interface.js#L27
 * @link https://github.com/OpenHausIO/backend/blob/dev/components/devices/class.interface.js#L40
 */
module.exports = class Interface {

    constructor(obj, stream) {

        Object.assign(this, obj);
        this._id = String(obj._id);

        // hide stream object on interface
        Object.defineProperty(this, "stream", {
            value: stream
        });

        // share/set interface stream
        // see #86
        //let { interfaceStreams } = global.sharedObjects;
        let { interfaceStreams } = require("../../system/shared.js");
        interfaceStreams.set(this._id, stream);

        // hot fix for #350
        Object.defineProperty(this, "cachedAgent", {
            value: null,
            enumerable: false,
            configurable: false,
            writable: true
        });

    }

    /**
     * @function schema
     * Interface schema 
     * 
     * @static
     * 
     * @returns {Object} https://joi.dev/api/?v=17.6.0#anyvalidatevalue-options
     */
    static schema() {

        // settings from node.js serialport (https://serialport.io/docs/api-bindings-cpp#open)
        const SERIAL = Joi.object({
            device: Joi.string().required(),
            baudRate: Joi.number().default(9600),
            dataBits: Joi.number().allow(5, 6, 7, 8).default(8),
            stopBits: Joi.number().allow(1, 1.5, 2).default(1),
            parity: Joi.string().valid("even", "odd", "none").default("none"),
            rtscts: Joi.boolean().default(false),
            xon: Joi.boolean().default(false),
            xoff: Joi.boolean().default(false),
            xany: Joi.boolean().default(false),
            hupcl: Joi.boolean().default(true)
        }).required();

        const ETHERNET = Joi.object({
            //transport: Joi.string().valid("tcp", "udp", "raw").default("tcp"),
            socket: Joi.string().valid("tcp", "udp", "raw").default("tcp"),
            host: Joi.string().required(),
            port: Joi.number().min(1).max(65535).required(),
            // https://regex101.com/r/wF7Nfa/1
            // https://stackoverflow.com/a/50080404/5781499
            mac: Joi.string().default(null).allow(null).regex(/^([0-9a-fA-F]{2}[:]){5}[0-9a-fA-F]{2}$/)
        }).required();

        return Joi.object({
            _id: Joi.string().pattern(/^[0-9a-fA-F]{24}$/).default(() => {
                return String(new mongodb.ObjectId());
            }),
            type: Joi.string().default("ETHERNET"),
            settings: Joi.object().when("type", {
                is: "ETHERNET",
                then: ETHERNET
            }).when("type", {
                is: "SERIAL",
                then: SERIAL
            }),
            adapter: Joi.array().items("eiscp", "raw", "eol").default(["raw"]),
            description: Joi.string().allow(null).default(null)
        });

    }


    /**
     * @function httpAgent
     * Creates a custom http agent which use the underalying interfaceStream to forward data
     * 
     * @param {Object} [options] httpAgent options 
     * 
     * @returns {Object} httpAgent object
     * 
     * @link https://nodejs.org/dist/latest-v16.x/docs/api/http.html#new-agentoptions 
     */
    /*
    // *OLD* function, see #329
    httpAgent(options) {

        options = Object.assign({
            keepAlive: true,
            //maxSockets: 1,
            keepAliveMsecs: 3000,        // use this as websocket ping/pong value to detect broken connections?
        }, options);

        // stream nc tcp socket 
        // https://stackoverflow.com/a/33514724/5781499
        let agent = new Agent(options);

        // use interface stream as socket
        // createConnection returns duplex stream
        // https://nodejs.org/dist/latest-v14.x/docs/api/http.html#http_agent_createconnection_options_callback
        agent.createConnection = (options, cb) => {

            let input = new PassThrough();
            let output = new PassThrough();


            this.stream.pipe(input, { end: false });
            output.pipe(this.stream, { end: false });


            let socket = Duplex.from({
                readable: input,
                writable: output
            });

            // when multiple reuqests are done parallal, sometimes a AbortedErr is thrown
            // see #329 for details
            // TODO: Check if the upstream is drained, and perform requests in series
            // As "quick fix" till a solution is found for #312 catch the trown error
            socket.on("error", (err) => {
                console.log("Catched error on http.agent.createConnection", err);
                this.stream.destroy();
            });

            /*
                        [socket, this.stream, input, output].forEach((stream) => {
                            let cleanup = finished(stream, (err) => {
            
                                console.log("Socket duplex stream ended", err);
            
                                let chunk;
            
                                while (null !== (chunk = input.read())) {
                                    console.log(`>>>>>> Read ${chunk.length} bytes of data...`);
                                }
            
                                while (null !== (chunk = output.read())) {
                                    console.log(`>>>>>> Read ${chunk.length} bytes of data...`);
                                }
            
                                input.removeAllListeners();
                                output.removeAllListeners();
            
                                this.stream.unpipe(input);
                                output.unpipe(this.stream);
            
                                cleanup();
            
                            });
                        });
            *


            // TODO implement other socket functions?!
            //if (process.env.NODE_ENV !== "production") {
            socket.ref = (...args) => { console.log("socket.ref called", ...args); };
            socket.unref = (...args) => { console.log("socket.unref called", ...args); };
            socket.setKeepAlive = (...args) => { console.log("socket.setKeepAlive called", ...args); };
            socket.setTimeout = (...args) => { console.log("socket.setTimeout called", ...args); };
            socket.setNoDelay = (...args) => { console.log("socket.setNoDelay called", ...args); };
            // socket.remoteAddress=this.settings.host
            // socket.remotePort=this.settings.port
            //}

            //return socket;
            cb(null, socket);

        };

        return agent;

    }
    */

    _agentConnection() {

        // cleanup, could be possible be piped from previous "connections"
        this.stream.unpipe();

        let readable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[incoming]", chunk);

                // temp fix for #343
                // this is not the prefered fix for this issue
                // it should be handled on "stream/socket" level instead
                // the issue above occoured with a "shelly 1pm" and parallel requests to /status /ota /settings
                // NOTE: what if the body contains json that has a `connection: close` property/key/value?
                // NOTE: This alos breaks websocket connections!!!!
                /*
                if (protocol !== "https:") {
                    chunk = chunk.toString().replace(/connection:\s?close\r\n/i, "connection: keep-alive\r\n");
                }
                */

                this.push(chunk);
                cb();

            }
        });

        let writable = new Transform({
            transform(chunk, enc, cb) {

                //console.log("[outgoing]", chunk);

                this.push(chunk);
                cb();

            }
        });

        let stream = new Duplex.from({
            readable,
            writable
        });

        [
            "destroy", "ref", "unref",
            "setKeepAlive", "setTimeout", "setNoDelay"
        ].forEach((fnc) => {

            // fake methods above
            // dont need any of these
            stream[fnc] = () => { };

        });

        this.stream.pipe(readable, { end: false });
        writable.pipe(this.stream, { end: false });

        return stream;

    }


    // NEW VERSION, fix for #329
    httpAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new http.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {
            return this._agentConnection(...args);
        };

        this.cachedAgent = agent;
        return agent;

    }


    httpsAgent(options = {}) {

        if (this.cachedAgent) {
            return this.cachedAgent;
        }

        let agent = new https.Agent({
            keepAlive: true,
            maxSockets: 1,
            ...options
        });

        agent.createConnection = (...args) => {

            let { host, port } = this.settings;
            let socket = this._agentConnection(...args);

            return tls.connect({
                socket,
                host,
                port,
                ...options
            });

        };

        this.cachedAgent = agent;
        return agent;

    }

};

@mStirner
Copy link
Member Author

Important

Wait for refactoring in v4: #460

@mStirner
Copy link
Member Author

Test plugin:

const Store = require("../../components/store/class.store.js");
const request = require("../../helper/request.js");
const { Agent } = require("http");

module.exports = (info, logger, init) => {
    return init([
        "devices",
        "endpoints",
        "plugins",
        "rooms",
        "ssdp",
        "store",
        "users",
        "vault"
    ], async (scope, [
        C_DEVICES,
        C_ENDPOINTS,
        C_PLUGINS,
        C_ROOMS,
        C_SSDP,
        C_STORE,
        C_USERS,
        C_VAULT
    ]) => {

        /*
        let store = C_STORE.items[0];

        setTimeout(() => {

            console.clear();

            console.log("Store", store);

            //C_STORE.validate()

            let { value, error } = Store.validate(store);

            console.log(error, value);

        }, 3000);
        */




        C_DEVICES.found({
            labels: [
                "example=true",
                "host=example.com",
                "custom=abcd",
                "test=true"
            ]
        }, (device) => {
            setTimeout(() => {



                console.log("Devce found", device);

                let iface = device.interfaces[1];
                let { host, port } = iface.settings;
                let agent = iface.httpsAgent();


                request(`https://${host}:${port}`, {
                    agent
                }, (err, result) => {
                    console.log(err || result.body)
                });


            }, 3000);
        }, async (query) => {

            let device = await C_DEVICES.add({
                ...query,
                name: "example.com",
                interfaces: [{
                    type: "ETHERNET",
                    settings: {
                        host: "example.com",
                        port: 80
                    }
                }, {
                    type: "ETHERNET",
                    settings: {
                        host: "example.com",
                        port: 443
                    }
                }]
            });

            console.log("Device added", device)

        });




    });
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant