Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disk persistance for _messages in case of service crash/reboot while disconnected from the amqp server #94

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Connection management for amqplib.
* Automatically reconnect when your amqplib broker dies in a fire.
* Round-robin connections between multiple brokers in a cluster.
* If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
* queued message are persisted to disk in case of unexpected crash/reboot, and recovered in memory.
* Supports both promises and callbacks (using [promise-breaker](https://github.com/jwalton/node-promise-breaker))
* Very un-opinionated library - a thin wrapper around amqplib.

Expand Down Expand Up @@ -133,6 +134,8 @@ Options:
arbitrary data in.
* `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
are plain JSON objects. These will be encoded automatically before being sent.
* `options.swap_path` if defined, then ChannelWrapper will persist the array of queued messages to disk.
* `options.swap_size` the size of the storage (defaults to `50000`) see [node-localstorage](https://www.npmjs.com/package/node-localstorage).

### AmqpConnectionManager#isConnected()

Expand Down
39 changes: 35 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"description": "Auto-reconnect and round robin support for amqplib.",
"main": "lib/index.js",
"dependencies": {
"@trusk/array-to-disk": "1.1.0",
"promise-breaker": "^5.0.0"
},
"peerDependencies": {
Expand Down
34 changes: 33 additions & 1 deletion src/ChannelWrapper.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { EventEmitter } from 'events';
import pb from 'promise-breaker';
import DiskArray from '@trusk/array-to-disk';
import rimraf from "rimraf";
import path from "path";

/**
* Calls to `publish()` or `sendToQueue()` work just like in amqplib, but messages are queued internally and
Expand Down Expand Up @@ -161,7 +164,36 @@ export default class ChannelWrapper extends EventEmitter {
this._json = ('json' in options) ? options.json : false;

// Place to store queued messages.
this._messages = [];
this._messages = new DiskArray(options.swap_path, options.swap_size);
this._messages.setEventEmitter(this, "droppedMessage");
if (options.swap_path) {
rimraf.sync(`${path.resolve(options.swap_path)}/data.*`);
}
const messages_to_republish = [];
while (this._messages.length) {
messages_to_republish.push(this._messages.shift());
}
while (messages_to_republish.length) {
const mtr = messages_to_republish[0];
if (mtr && mtr.content && ["publish", "sendToQueue"].includes(mtr.type)) {
const content =
mtr.content.type === "Buffer"
? Buffer.from(mtr.content)
: mtr.content || null;
const arg = [
...(mtr.type === "publish"
? [mtr.exchange, mtr.routingKey || "", content, mtr.options]
: []),
...(mtr.type === "sendToQueue"
? [mtr.queue || "", content, mtr.options]
: [])
];
this[mtr.type](
...arg
);
}
messages_to_republish.shift();
}

// Place to store published, but not yet confirmed messages
this._unconfirmedMessages = [];
Expand Down
1 change: 1 addition & 0 deletions test/ChannelWrapperTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ describe('ChannelWrapper', function() {
});

it('should encode JSON messages', function() {
// require("fs").writeFileSync("./swap/data", "[]"); // clean the swap
connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
json: true
Expand Down