Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[doc] How to use ZMQ (ZeroMQ) directly #10

Open
2 tasks
coolaj86 opened this issue Jun 23, 2023 · 0 comments
Open
2 tasks

[doc] How to use ZMQ (ZeroMQ) directly #10

coolaj86 opened this issue Jun 23, 2023 · 0 comments

Comments

@coolaj86
Copy link
Member

coolaj86 commented Jun 23, 2023

ZeroMQ.js@v6 (beta-not-beta)

To-Dos:

"use strict";

let Zmq = require("zeromq");

async function run() {
    let zmqUrl = "tcp://10.11.5.104:28332";
    //let zmqUrl = "tcp://10.11.5.1041:28332";
    //let zmqUrl = "tcp://10.11.5.104:28331";

    // replaces `let zmqSubSocket = zmq.socket("sub");`
    let sock = new Zmq.Subscriber();

    console.log(`ZeroMQ Subscriber connecting to '${zmqUrl}'`);
    void monitor(sock, onEvent);

    sock.connect(zmqUrl);
    console.log("[connect]");

    sock.subscribe("rawtxlock");
    console.log("[subscribe]");

    // receive() is in c++ land
    // https://github.com/zeromq/zeromq.js/blob/62f6e252f530ea05c86be15b06a58214eac1b34d/src/socket.cc#L307
    // asyncInterator is generic generated by TypeScript
    // https://github.com/zeromq/zeromq.js/blob/62f6e252f530ea05c86be15b06a58214eac1b34d/src/index.ts#L291C1-L292C1
    //console.log(sock);

    /*
    try {
        for await (let [topic, msg] of sock) {
            console.info(`${topic}:`);
            let hex = msg.toString("hex");
            console.info(hex);
        }
    } catch (e) {
        throw e;
    }
    */

    for (;;) {
        let [topic, msg] = await sock.receive().catch(errorToMessage);
        if (topic.length === 0) {
            // intentionally closed
            break;
        }
        if (topic === "error") {
            console.warn("[error] socket failed to receive", msg);
            continue;
        }

        console.info(`[topic] ${topic}:`);
        let hex = msg.toString("hex");
        console.info(hex);
    }

    function errorToMessage(e) {
        if (sock.closed) {
            if (e.code === "EAGAIN") {
                return ["", null];
            }
        }
        return ["error", e];
    }
}

async function onEvent(type, value, address, err) {
    if (err) {
        console.error(err.stack || err);
        return;
    }

    let subject = `[monitor] ${type}@${address}`;
    if (!value) {
        console.info(`${subject}:`);
        return;
    }

    console.info(`${subject}`);
    console.info(value);
}

// events
// https://github.com/zeromq/zeromq.js/blob/v6.0.0-beta.17/src/observer.cc#L15
let events = {
    errors: [
        "accept:error",
        "bind:error",
        "close:error",
        "handshake:error:auth",
        "handshake:error:other",
        "handshake:error:protocol",
    ],
    intervals: ["connect:retry"],
    notifiers: [
        "handshake",
        "connect",
        "connect:delay",
        "accept",
        "bind",
        "close",
        "disconnect",
        "end",
    ],
};

async function monitor(sock, cb) {
    for (;;) {
        let event;
        try {
            event = await sock.events.receive();
        } catch (e) {
            if (!sock.closed) {
                console.error("[error] monitor failed", e);
            }
            break;
        }

        let isError = events.errors.includes(event.type);
        if (isError) {
            let value = event.error?.errno || 0;
            void cb(event.type, value, event.address, event.error);
            continue;
        }

        let hasIntervalValue = events.intervals.includes(event.type);
        if (hasIntervalValue) {
            let value = event.interval;
            void cb(event.type, value, event.address, event.error);
            continue;
        }

        let isNotifier = events.notifiers.includes(event.type);
        if (isNotifier) {
            let value = null;
            void cb(event.type, value, event.address, event.error);
            continue;
        }

        console.warn(`[WARN] unknown event '${event.type}'`);
    }
}

run();

Notes:

ZeroMQ.js@v5 (deprecated-not-deprecated)

  1. Must have BOTH monitor and connect!
  2. These messages can be parsed with dashtx (npm install --save dashtx) and dashtx-inspect (npm install --location=global dashtx)
"use strict";

let Fs = require("node:fs/promises");

// TODO v6-beta
let zmq = require("zeromq");

// TODO read from JSON config
let config = {
  zmq: "tcp://10.11.5.101:28332",
};

async function main() {
  let zmqSubSocket = zmq.socket("sub");

  console.info(`[INFO] ZMQ opening socket to '${config.zmq}'`);

  // workaround for https://github.com/zeromq/zeromq.js/issues/574
  var timeout = setTimeout(function () {
    // neither accepting nor rejecting - either invalid or firewalled (DROP)
    console.error(`ZMQ address '${config.zmq}' cannot be reached`);
    process.exit(1);
  }, 7 * 1000);

  zmqSubSocket.on("connect", function (fd, endPoint) {
    console.info("ZMQ connected to:", endPoint);
    clearTimeout(timeout);
    timeout = null;
  });

  zmqSubSocket.on("connect_delay", function (fd, endPoint) {
    console.warn("ZMQ connection delay:", endPoint);
    clearTimeout(timeout);
    timeout = null;
  });

  zmqSubSocket.subscribe("rawtxlock");
  zmqSubSocket.on("message", function (topic, message) {
    var topicString = topic.toString("utf8");
    console.log("[ZMQ]", topicString, message.toString("hex"));
  });

  zmqSubSocket.on("monitor_error", function (err) {
    console.warn(
      "Error in monitoring: %s, will restart monitoring in 5 seconds",
      err,
    );
    setTimeout(function () {
      zmqSubSocket.monitor(500, 0);
    }, 5000);
  });

  // IMPORTANT: monitor MUST run for "connect" messages
  zmqSubSocket.monitor(500, 0);
  zmqSubSocket.connect(config.zmq);
}

main().catch(function (err) {
  console.error(err.stack);
  process.exit(1);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant