diff --git a/README.md b/README.md index 6250cc6..39f44fd 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Connection management for amqplib. * Automatically reconnect when your amqplib broker dies in a fire. * Round-robin connections between multiple brokers in a cluster. * If messages are sent while the broker is unavailable, queues messages in memory until we reconnect. +* queued message are persisted to disk in case of unexpected crash/reboot, and recovered in memory. * Supports both promises and callbacks (using [promise-breaker](https://github.com/jwalton/node-promise-breaker)) * Very un-opinionated library - a thin wrapper around amqplib. @@ -133,6 +134,8 @@ Options: arbitrary data in. * `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` are plain JSON objects. These will be encoded automatically before being sent. +* `options.swap_path` if defined, then ChannelWrapper will persist the array of queued messages to disk. +* `options.swap_size` the size of the storage (defaults to `50000`) see [node-localstorage](https://www.npmjs.com/package/node-localstorage). ### AmqpConnectionManager#isConnected() diff --git a/package-lock.json b/package-lock.json index 0dc700b..1fe5980 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1715,6 +1715,14 @@ "integrity": "sha512-+iTbntw2IZPb/anVDbypzfQa+ay64MW0Zo8aJ8gZPWMMK6/OubMVb6lUPMagqjOPnmtauXnFCACVl3O7ogjeqQ==", "dev": true }, + "@trusk/array-to-disk": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@trusk/array-to-disk/-/array-to-disk-1.1.0.tgz", + "integrity": "sha512-Tx3RVttvVBodfqR7Am/bawbb8JaS4HGQAgdgELmWvcjYO79idiACWArGJuoPx9XM0aja1UGAl+/Hv334rkaj3Q==", + "requires": { + "node-localstorage": "1.3.1" + } + }, "@types/events": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/@types/events/-/events-3.0.0.tgz", @@ -4694,8 +4702,7 @@ "graceful-fs": { "version": "4.1.15", "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.1.15.tgz", - "integrity": "sha512-6uHUhOPEBgQ24HM+r6b/QwWfZq+yiFcipKFrOFiBEnWdy5sdzYoi+pJeQaPI5qOLRFqWmAXUPQNsielzdLoecA==", - "dev": true + "integrity": "sha512-6uHUhOPEBgQ24HM+r6b/QwWfZq+yiFcipKFrOFiBEnWdy5sdzYoi+pJeQaPI5qOLRFqWmAXUPQNsielzdLoecA==" }, "greenkeeper-lockfile": { "version": "1.15.1", @@ -5078,8 +5085,7 @@ "imurmurhash": { "version": "0.1.4", "resolved": "https://registry.npmjs.org/imurmurhash/-/imurmurhash-0.1.4.tgz", - "integrity": "sha1-khi5srkoojixPcT7a21XbyMUU+o=", - "dev": true + "integrity": "sha1-khi5srkoojixPcT7a21XbyMUU+o=" }, "indent-string": { "version": "3.2.0", @@ -6369,6 +6375,26 @@ "integrity": "sha512-8dG4H5ujfvFiqDmVu9fQ5bOHUC15JMjMY/Zumv26oOvvVJjM67KF8koCWIabKQ1GJIa9r2mMZscBq/TbdOcmNA==", "dev": true }, + "node-localstorage": { + "version": "1.3.1", + "resolved": "https://registry.npmjs.org/node-localstorage/-/node-localstorage-1.3.1.tgz", + "integrity": "sha512-NMWCSWWc6JbHT5PyWlNT2i8r7PgGYXVntmKawY83k/M0UJScZ5jirb61TLnqKwd815DfBQu+lR3sRw08SPzIaQ==", + "requires": { + "write-file-atomic": "^1.1.4" + }, + "dependencies": { + "write-file-atomic": { + "version": "1.3.4", + "resolved": "https://registry.npmjs.org/write-file-atomic/-/write-file-atomic-1.3.4.tgz", + "integrity": "sha1-+Aek8LHZ6ROuekgRLmzDrxmRtF8=", + "requires": { + "graceful-fs": "^4.1.11", + "imurmurhash": "^0.1.4", + "slide": "^1.1.5" + } + } + } + }, "node-modules-regexp": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/node-modules-regexp/-/node-modules-regexp-1.0.0.tgz", @@ -11400,6 +11426,11 @@ "is-fullwidth-code-point": "^2.0.0" } }, + "slide": { + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/slide/-/slide-1.1.6.tgz", + "integrity": "sha1-VusCfWW00tzmyy4tMsTUr8nh1wc=" + }, "snapdragon": { "version": "0.8.2", "resolved": "https://registry.npmjs.org/snapdragon/-/snapdragon-0.8.2.tgz", diff --git a/package.json b/package.json index 3434333..1e45090 100644 --- a/package.json +++ b/package.json @@ -4,6 +4,7 @@ "description": "Auto-reconnect and round robin support for amqplib.", "main": "lib/index.js", "dependencies": { + "@trusk/array-to-disk": "1.1.0", "promise-breaker": "^5.0.0" }, "peerDependencies": { diff --git a/src/ChannelWrapper.js b/src/ChannelWrapper.js index de20f48..951a053 100644 --- a/src/ChannelWrapper.js +++ b/src/ChannelWrapper.js @@ -1,5 +1,8 @@ import { EventEmitter } from 'events'; import pb from 'promise-breaker'; +import DiskArray from '@trusk/array-to-disk'; +import rimraf from "rimraf"; +import path from "path"; /** * Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and @@ -161,7 +164,36 @@ export default class ChannelWrapper extends EventEmitter { this._json = ('json' in options) ? options.json : false; // Place to store queued messages. - this._messages = []; + this._messages = new DiskArray(options.swap_path, options.swap_size); + this._messages.setEventEmitter(this, "droppedMessage"); + if (options.swap_path) { + rimraf.sync(`${path.resolve(options.swap_path)}/data.*`); + } + const messages_to_republish = []; + while (this._messages.length) { + messages_to_republish.push(this._messages.shift()); + } + while (messages_to_republish.length) { + const mtr = messages_to_republish[0]; + if (mtr && mtr.content && ["publish", "sendToQueue"].includes(mtr.type)) { + const content = + mtr.content.type === "Buffer" + ? Buffer.from(mtr.content) + : mtr.content || null; + const arg = [ + ...(mtr.type === "publish" + ? [mtr.exchange, mtr.routingKey || "", content, mtr.options] + : []), + ...(mtr.type === "sendToQueue" + ? [mtr.queue || "", content, mtr.options] + : []) + ]; + this[mtr.type]( + ...arg + ); + } + messages_to_republish.shift(); + } // Place to store published, but not yet confirmed messages this._unconfirmedMessages = []; diff --git a/test/ChannelWrapperTest.js b/test/ChannelWrapperTest.js index 75c76aa..d82c535 100644 --- a/test/ChannelWrapperTest.js +++ b/test/ChannelWrapperTest.js @@ -506,6 +506,7 @@ describe('ChannelWrapper', function() { }); it('should encode JSON messages', function() { + // require("fs").writeFileSync("./swap/data", "[]"); // clean the swap connectionManager.simulateConnect(); const channelWrapper = new ChannelWrapper(connectionManager, { json: true