Skip to content

Commit

Permalink
Add Defer method to handler ackFns arg to allow sync handlers to call…
Browse files Browse the repository at this point in the history
… async functions and ack/nack once async work is complete
  • Loading branch information
James Carlson committed Feb 5, 2019
1 parent c915fee commit 9e22e41
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
3 changes: 3 additions & 0 deletions Bus.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export declare class Bus implements IBus {
}, ackFns?: {
ack: () => void;
nack: () => void;
defer: () => void;
}) => void, withTopic?: string): Promise<IConsumerDispose>;
Send(queue: string, msg: {
TypeID: string;
Expand All @@ -42,6 +43,7 @@ export declare class Bus implements IBus {
}, ackFns?: {
ack: () => void;
nack: () => void;
defer: () => void;
}) => void): Promise<IConsumerDispose>;
ReceiveTypes(queue: string, handlers: {
rxType: {
Expand All @@ -52,6 +54,7 @@ export declare class Bus implements IBus {
}, ackFns?: {
ack: () => void;
nack: () => void;
defer: () => void;
}) => void;
}[]): Promise<IConsumerDispose>;
Request(request: {
Expand Down
22 changes: 17 additions & 5 deletions Bus.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions Bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class Bus implements IBus {
public Subscribe(
type: { TypeID: string },
subscriberName: string,
handler: (msg: { TypeID: string }, ackFns?: { ack: () => void; nack: () => void }) => void,
handler: (msg: { TypeID: string }, ackFns?: { ack: () => void; nack: () => void; defer: () => void }) => void,
withTopic: string = '#'):
Promise<IConsumerDispose>
{
Expand Down Expand Up @@ -123,6 +123,7 @@ export class Bus implements IBus {
_msg.TypeID = _msg.TypeID || msg.properties.type; //so we can get non-BusMessage events

var ackdOrNackd = false;
var deferred = false;

handler(_msg, {
ack: () => {
Expand All @@ -138,10 +139,13 @@ export class Bus implements IBus {
this.SendToErrorQueue(_msg, 'attempted to nack previously nack\'d message');
}
ackdOrNackd = true;
}
},
defer: () => {
deferred = true;
},
});

if (!ackdOrNackd) channel.ack(msg);
if (!ackdOrNackd && !deferred) channel.ack(msg);
}
else {
this.SendToErrorQueue(_msg, util.format('mismatched TypeID: %s !== %s', msg.properties.type, type.TypeID));
Expand Down Expand Up @@ -184,7 +188,7 @@ export class Bus implements IBus {
public Receive(
rxType: { TypeID: string },
queue: string,
handler: (msg: { TypeID: string }, ackFns?: { ack: () => void; nack: () => void }) => void):
handler: (msg: { TypeID: string }, ackFns?: { ack: () => void; nack: () => void; defer: () => void }) => void):
Promise<IConsumerDispose>
{
var channel = null;
Expand All @@ -204,6 +208,7 @@ export class Bus implements IBus {
_msg.TypeID = _msg.TypeID || msg.properties.type; //so we can get non-BusMessage events

var ackdOrNackd = false;
var deferred = false;

handler(_msg, {
ack: () => {
Expand All @@ -219,10 +224,13 @@ export class Bus implements IBus {
this.SendToErrorQueue(_msg, 'attempted to nack previously nack\'d message');
}
ackdOrNackd = true;
},
defer: () => {
deferred = false;
}
});

if (!ackdOrNackd) channel.ack(msg);
if (!ackdOrNackd && !deferred) channel.ack(msg);
}
else {
this.SendToErrorQueue(_msg, util.format('mismatched TypeID: %s !== %s', msg.properties.type, rxType.TypeID))
Expand Down Expand Up @@ -254,7 +262,7 @@ export class Bus implements IBus {

public ReceiveTypes(
queue: string,
handlers: { rxType: { TypeID: string }; handler: (msg: { TypeID: string }, ackFns?: { ack: () => void; nack: () => void }) => void }[]):
handlers: { rxType: { TypeID: string }; handler: (msg: { TypeID: string }, ackFns?: { ack: () => void; nack: () => void, defer: () => void }) => void }[]):
Promise<IConsumerDispose>
{
var channel = null;
Expand All @@ -272,6 +280,7 @@ export class Bus implements IBus {
_msg.TypeID = _msg.TypeID || msg.properties.type; //so we can get non-BusMessage events

var ackdOrNackd = false;
var deferred = false;

handler.handler(_msg, {
ack: () => {
Expand All @@ -287,10 +296,13 @@ export class Bus implements IBus {
this.SendToErrorQueue(_msg, 'attempted to nack previously nack\'d message');
}
ackdOrNackd = true;
}
},
defer: () => {
deferred = true;
},
});

if (!ackdOrNackd) channel.ack(msg);
if (!ackdOrNackd && !deferred) channel.ack(msg);
});
})
.then((ctag) => {
Expand Down

0 comments on commit 9e22e41

Please sign in to comment.