Skip to content

Commit

Permalink
Merge pull request #68 from nitrictech/feature/remove-sendbatch
Browse files Browse the repository at this point in the history
feat: combine send and send batch functions
  • Loading branch information
jyecusch authored Oct 13, 2021
2 parents 72cfe34 + c11f8d9 commit c2417d3
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 208 deletions.
8 changes: 4 additions & 4 deletions examples/queues/queues-examples.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
import { queueReceive } from './receive';
import { queueSend } from './send';
import { QueueServiceClient } from '@nitric/api/proto/queue/v1/queue_grpc_pb';
import { QueueSendResponse, QueueReceiveResponse } from '@nitric/api/proto/queue/v1/queue_pb';
import { QueueReceiveResponse, QueueSendBatchResponse } from '@nitric/api/proto/queue/v1/queue_pb';

const proto = QueueServiceClient.prototype;

const CALLBACKFN = (response) => (_, cb: any) => cb(null, response);

describe('test queues snippets', () => {
beforeAll(() => {
jest
.spyOn(proto, 'send')
.mockImplementation(CALLBACKFN(new QueueSendResponse()));
.spyOn(proto, 'sendBatch')
.mockImplementation(CALLBACKFN(new QueueSendBatchResponse()));
jest
.spyOn(proto, 'receive')
.mockImplementation(CALLBACKFN(new QueueReceiveResponse()));
Expand Down
2 changes: 1 addition & 1 deletion examples/queues/send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import { queues } from '@nitric/sdk';
// [END import]

export async function queueSend() {
export async function queueSend(): Promise<void> {
// [START snippet]
// Publish a task to the queue
const payload = {
Expand Down
156 changes: 17 additions & 139 deletions src/api/queues/v0/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('Queue Client Tests', () => {

beforeAll(() => {
sendMock = jest
.spyOn(GrpcQueueServiceClient.prototype, 'send')
.spyOn(GrpcQueueServiceClient.prototype, 'sendBatch')
.mockImplementation((request, callback: any) => {
callback(MOCK_ERROR, null);

Expand Down Expand Up @@ -66,14 +66,14 @@ describe('Queue Client Tests', () => {
});
});

describe('Given nitric.api.queue.QueueServiceClient.Send succeeds', () => {
describe('Given nitric.api.queue.QueueServiceClient.Send succeeds when an array of tasks are sent', () => {
let sendMock;

beforeAll(() => {
sendMock = jest
.spyOn(GrpcQueueServiceClient.prototype, 'send')
.spyOn(GrpcQueueServiceClient.prototype, 'sendBatch')
.mockImplementation((request, callback: any) => {
const mockResponse = new QueueSendResponse();
const mockResponse = new QueueSendBatchResponse();

callback(null, mockResponse);

Expand All @@ -85,152 +85,30 @@ describe('Queue Client Tests', () => {
jest.resetAllMocks();
});

it('Then Queue.Send should resolve with no failed messages', async () => {
it('Then Queue.Send with an array of tasks should resolve with no failed messages', async () => {
const queueing = new Queueing();
await expect(
queueing.queue('test').send({
queueing.queue('test').send([{
id: 'task',
payloadType: 'test',
payload: { test: 1 },
})
).resolves.toBeUndefined();
});

it('Then Queue.Send should be called once', async () => {
expect(sendMock).toBeCalledTimes(1);
});
});
});

describe('SendBatch', () => {
describe('Given nitric.api.queue.QueueServiceClient.SendBatch throws an error', () => {
const MOCK_ERROR = {
code: 12,
message: 'UNIMPLEMENTED',
};
let sendBatchMock;

beforeAll(() => {
sendBatchMock = jest
.spyOn(GrpcQueueServiceClient.prototype, 'sendBatch')
.mockImplementation((request, callback: any) => {
callback(MOCK_ERROR, null);

return null as any;
});
});

afterAll(() => {
jest.resetAllMocks();
});

it('Then Queue.sendBatch should reject', async () => {
const queueing = new Queueing();
// expect.assertions(1);
await expect(
queueing.queue('test').send([
{
id: 'test',
payloadType: 'Test Payload',
payload: {
test: 'test',
},
},
])
).rejects.toEqual(new UnimplementedError("UNIMPLEMENTED"));
});
});

describe('Given nitric.api.queue.QueueServiceClient.SendBatch succeeds', () => {
beforeAll(() => {
jest
.spyOn(GrpcQueueServiceClient.prototype, 'sendBatch')
.mockImplementation((request, callback: any) => {
const mockResponse = new QueueSendBatchResponse();
mockResponse.setFailedtasksList([]);
callback(null, mockResponse);

return null as any;
});
});

afterAll(() => {
jest.resetAllMocks();
}])
).resolves.toEqual([]);
});

it('Then Queue.sendBatch should resolve with no failed messages', async () => {
it('Then Queue.Send with one task should resolve with no failed messages', async () => {
const queueing = new Queueing();
await expect(
queueing.queue('test').send([
{
id: 'test',
payloadType: 'Test Payload',
payload: {
test: 'test',
},
},
])
).resolves.toEqual([]);
});
});

describe('Given nitric.api.queue.QueueServiceClient.SendBatch partially succeeds', () => {
const mockEvents = [
{
id: 'test',
payloadType: 'Test Payload',
payload: {
test: 'test',
},
},
];

beforeAll(() => {
jest
.spyOn(GrpcQueueServiceClient.prototype, 'sendBatch')
.mockImplementation((request, callback: any) => {
const mockResponse = new QueueSendBatchResponse();
mockResponse.setFailedtasksList(
mockEvents.map((e) => {
const msg = new FailedTask();
const evt = new NitricTask();
evt.setId(e.id);
evt.setPayloadType(e.payloadType);
evt.setPayload(Struct.fromJavaScript(e.payload));
msg.setTask(evt);
msg.setMessage('Failed to Push task');

return msg;
})
);
callback(null, mockResponse);

return null as any;
});
});

afterAll(() => {
jest.resetAllMocks();
queueing.queue('test').send({
id: 'task',
payloadType: 'test',
payload: { test: 1 },
})
).resolves.toEqual(undefined);
});

it('Then EventingClient.publish should resolve with no failed messages', async () => {
const queueing = new Queueing();
await expect(
queueing.queue('test').send([
{
id: 'test',
payloadType: 'Test Payload',
payload: {
test: 'test',
},
},
])
).resolves.toEqual([
{
task: mockEvents[0],
message: 'Failed to Push task',
},
]);
it('Then Queue.Send should be called once', async () => {
expect(sendMock).toBeCalledTimes(2);
});
});
});
Expand Down
87 changes: 23 additions & 64 deletions src/api/queues/v0/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { SERVICE_BIND } from '../../../constants';
import * as grpc from '@grpc/grpc-js';
import type { Task } from '../../../types';
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
import { fromGrpcError, InvalidArgumentError } from '../../errors';
import { fromGrpcError, InvalidArgumentError, InternalError } from '../../errors';

/**
* A message that has failed to be enqueued
Expand Down Expand Up @@ -104,80 +104,39 @@ export class Queue {
* value: "test"
* };
* });
* ```
*/
send = async (tasks: Task | Task[]): Promise<void | FailedMessage[]> => {
if (Array.isArray(tasks)) {
return this.sendBatch(tasks);
}

return new Promise((resolve, reject) => {
const request = new QueueSendRequest();

request.setTask(taskToWire(tasks));
request.setQueue(this.name);

this.queueing.QueueServiceClient.send(request, (error) => {
if (error) {
reject(fromGrpcError(error));
} else {
resolve();
}
});
});
};

/**
* Send a collection of tasks to a queue, which can be retrieved by other services.
*
* @param tasks the tasks to push to the queue
* @returns a list containing details of any tasks that failed to publish.
*
* Example:
* ```typescript
* import { Queueing } from "@nitric/sdk"
*
* const queueing = new Queueing();
*
* const failedTasks = await queueing.queue("my-queue").sendBatch([{
* payloadType: "my-payload";
* payload: {
* value: "test"
* };
* }]);
*
* // do something with failedTasks
* // console.log(failedTasks);
* ```
*/
private sendBatch = async (tasks: Task[]): Promise<FailedMessage[]> => {
async send(tasks: Task[]): Promise<FailedMessage[]>;
async send(tasks: Task): Promise<void>;
async send(tasks: Task | Task[]): Promise<void | FailedMessage[]> {
return new Promise((resolve, reject) => {
const request = new QueueSendBatchRequest();

const wireTasks = tasks.map(taskToWire);

request.setTasksList(wireTasks);
request.setTasksList(Array.isArray(tasks) ? tasks.map(task => taskToWire(task)) : [taskToWire(tasks)]);
request.setQueue(this.name);

this.queueing.QueueServiceClient.sendBatch(request, (error, response) => {
if (error) {
reject(fromGrpcError(error));
} else {
resolve(
response.getFailedtasksList().map((m) => ({
task: {
id: m.getTask().getId(),
payload: m.getTask().getPayload().toJavaScript(),
payloadType: m.getTask().getPayloadType(),
},
message: m.getMessage(),
}))
);
}
const failedTasks = response.getFailedtasksList().map((m) => ({
task: {
id: m.getTask().getId(),
payload: m.getTask().getPayload().toJavaScript(),
payloadType: m.getTask().getPayloadType(),
},
message: m.getMessage(),
}))
if (!Array.isArray(tasks)) { // Single Task returns
if (failedTasks.length > 0) {
reject(new InternalError(failedTasks[0].message));
}
resolve();
} else { // Array of Tasks return
resolve(failedTasks)
}
});
});
};

});
}
/**
* Pop 1 or more queue items from the specified queue up to the depth limit.
*
Expand Down

0 comments on commit c2417d3

Please sign in to comment.