diff --git a/examples/queues/queues-examples.test.ts b/examples/queues/queues-examples.test.ts index 1f101c9e..4184c09f 100644 --- a/examples/queues/queues-examples.test.ts +++ b/examples/queues/queues-examples.test.ts @@ -14,17 +14,17 @@ import { queueReceive } from './receive'; import { queueSend } from './send'; import { QueueServiceClient } from '@nitric/api/proto/queue/v1/queue_grpc_pb'; -import { QueueSendResponse, QueueReceiveResponse } from '@nitric/api/proto/queue/v1/queue_pb'; +import { QueueReceiveResponse, QueueSendBatchResponse } from '@nitric/api/proto/queue/v1/queue_pb'; const proto = QueueServiceClient.prototype; const CALLBACKFN = (response) => (_, cb: any) => cb(null, response); - + describe('test queues snippets', () => { beforeAll(() => { jest - .spyOn(proto, 'send') - .mockImplementation(CALLBACKFN(new QueueSendResponse())); + .spyOn(proto, 'sendBatch') + .mockImplementation(CALLBACKFN(new QueueSendBatchResponse())); jest .spyOn(proto, 'receive') .mockImplementation(CALLBACKFN(new QueueReceiveResponse())); diff --git a/examples/queues/send.ts b/examples/queues/send.ts index 147bc7ad..cfeb5779 100644 --- a/examples/queues/send.ts +++ b/examples/queues/send.ts @@ -15,7 +15,7 @@ import { queues } from '@nitric/sdk'; // [END import] -export async function queueSend() { +export async function queueSend(): Promise { // [START snippet] // Publish a task to the queue const payload = { diff --git a/src/api/queues/v0/queues.test.ts b/src/api/queues/v0/queues.test.ts index fb080006..5f6de73e 100644 --- a/src/api/queues/v0/queues.test.ts +++ b/src/api/queues/v0/queues.test.ts @@ -37,7 +37,7 @@ describe('Queue Client Tests', () => { beforeAll(() => { sendMock = jest - .spyOn(GrpcQueueServiceClient.prototype, 'send') + .spyOn(GrpcQueueServiceClient.prototype, 'sendBatch') .mockImplementation((request, callback: any) => { callback(MOCK_ERROR, null); @@ -66,14 +66,14 @@ describe('Queue Client Tests', () => { }); }); - describe('Given nitric.api.queue.QueueServiceClient.Send succeeds', () => { + describe('Given nitric.api.queue.QueueServiceClient.Send succeeds when an array of tasks are sent', () => { let sendMock; beforeAll(() => { sendMock = jest - .spyOn(GrpcQueueServiceClient.prototype, 'send') + .spyOn(GrpcQueueServiceClient.prototype, 'sendBatch') .mockImplementation((request, callback: any) => { - const mockResponse = new QueueSendResponse(); + const mockResponse = new QueueSendBatchResponse(); callback(null, mockResponse); @@ -85,152 +85,30 @@ describe('Queue Client Tests', () => { jest.resetAllMocks(); }); - it('Then Queue.Send should resolve with no failed messages', async () => { + it('Then Queue.Send with an array of tasks should resolve with no failed messages', async () => { const queueing = new Queueing(); await expect( - queueing.queue('test').send({ + queueing.queue('test').send([{ id: 'task', payloadType: 'test', payload: { test: 1 }, - }) - ).resolves.toBeUndefined(); - }); - - it('Then Queue.Send should be called once', async () => { - expect(sendMock).toBeCalledTimes(1); - }); - }); - }); - - describe('SendBatch', () => { - describe('Given nitric.api.queue.QueueServiceClient.SendBatch throws an error', () => { - const MOCK_ERROR = { - code: 12, - message: 'UNIMPLEMENTED', - }; - let sendBatchMock; - - beforeAll(() => { - sendBatchMock = jest - .spyOn(GrpcQueueServiceClient.prototype, 'sendBatch') - .mockImplementation((request, callback: any) => { - callback(MOCK_ERROR, null); - - return null as any; - }); - }); - - afterAll(() => { - jest.resetAllMocks(); - }); - - it('Then Queue.sendBatch should reject', async () => { - const queueing = new Queueing(); - // expect.assertions(1); - await expect( - queueing.queue('test').send([ - { - id: 'test', - payloadType: 'Test Payload', - payload: { - test: 'test', - }, - }, - ]) - ).rejects.toEqual(new UnimplementedError("UNIMPLEMENTED")); - }); - }); - - describe('Given nitric.api.queue.QueueServiceClient.SendBatch succeeds', () => { - beforeAll(() => { - jest - .spyOn(GrpcQueueServiceClient.prototype, 'sendBatch') - .mockImplementation((request, callback: any) => { - const mockResponse = new QueueSendBatchResponse(); - mockResponse.setFailedtasksList([]); - callback(null, mockResponse); - - return null as any; - }); - }); - - afterAll(() => { - jest.resetAllMocks(); + }]) + ).resolves.toEqual([]); }); - it('Then Queue.sendBatch should resolve with no failed messages', async () => { + it('Then Queue.Send with one task should resolve with no failed messages', async () => { const queueing = new Queueing(); await expect( - queueing.queue('test').send([ - { - id: 'test', - payloadType: 'Test Payload', - payload: { - test: 'test', - }, - }, - ]) - ).resolves.toEqual([]); - }); - }); - - describe('Given nitric.api.queue.QueueServiceClient.SendBatch partially succeeds', () => { - const mockEvents = [ - { - id: 'test', - payloadType: 'Test Payload', - payload: { - test: 'test', - }, - }, - ]; - - beforeAll(() => { - jest - .spyOn(GrpcQueueServiceClient.prototype, 'sendBatch') - .mockImplementation((request, callback: any) => { - const mockResponse = new QueueSendBatchResponse(); - mockResponse.setFailedtasksList( - mockEvents.map((e) => { - const msg = new FailedTask(); - const evt = new NitricTask(); - evt.setId(e.id); - evt.setPayloadType(e.payloadType); - evt.setPayload(Struct.fromJavaScript(e.payload)); - msg.setTask(evt); - msg.setMessage('Failed to Push task'); - - return msg; - }) - ); - callback(null, mockResponse); - - return null as any; - }); - }); - - afterAll(() => { - jest.resetAllMocks(); + queueing.queue('test').send({ + id: 'task', + payloadType: 'test', + payload: { test: 1 }, + }) + ).resolves.toEqual(undefined); }); - it('Then EventingClient.publish should resolve with no failed messages', async () => { - const queueing = new Queueing(); - await expect( - queueing.queue('test').send([ - { - id: 'test', - payloadType: 'Test Payload', - payload: { - test: 'test', - }, - }, - ]) - ).resolves.toEqual([ - { - task: mockEvents[0], - message: 'Failed to Push task', - }, - ]); + it('Then Queue.Send should be called once', async () => { + expect(sendMock).toBeCalledTimes(2); }); }); }); diff --git a/src/api/queues/v0/queues.ts b/src/api/queues/v0/queues.ts index 18a7c588..740688b2 100644 --- a/src/api/queues/v0/queues.ts +++ b/src/api/queues/v0/queues.ts @@ -23,7 +23,7 @@ import { SERVICE_BIND } from '../../../constants'; import * as grpc from '@grpc/grpc-js'; import type { Task } from '../../../types'; import { Struct } from 'google-protobuf/google/protobuf/struct_pb'; -import { fromGrpcError, InvalidArgumentError } from '../../errors'; +import { fromGrpcError, InvalidArgumentError, InternalError } from '../../errors'; /** * A message that has failed to be enqueued @@ -104,80 +104,39 @@ export class Queue { * value: "test" * }; * }); - * ``` */ - send = async (tasks: Task | Task[]): Promise => { - if (Array.isArray(tasks)) { - return this.sendBatch(tasks); - } - - return new Promise((resolve, reject) => { - const request = new QueueSendRequest(); - - request.setTask(taskToWire(tasks)); - request.setQueue(this.name); - - this.queueing.QueueServiceClient.send(request, (error) => { - if (error) { - reject(fromGrpcError(error)); - } else { - resolve(); - } - }); - }); - }; - - /** - * Send a collection of tasks to a queue, which can be retrieved by other services. - * - * @param tasks the tasks to push to the queue - * @returns a list containing details of any tasks that failed to publish. - * - * Example: - * ```typescript - * import { Queueing } from "@nitric/sdk" - * - * const queueing = new Queueing(); - * - * const failedTasks = await queueing.queue("my-queue").sendBatch([{ - * payloadType: "my-payload"; - * payload: { - * value: "test" - * }; - * }]); - * - * // do something with failedTasks - * // console.log(failedTasks); - * ``` - */ - private sendBatch = async (tasks: Task[]): Promise => { + async send(tasks: Task[]): Promise; + async send(tasks: Task): Promise; + async send(tasks: Task | Task[]): Promise { return new Promise((resolve, reject) => { const request = new QueueSendBatchRequest(); - const wireTasks = tasks.map(taskToWire); - - request.setTasksList(wireTasks); + request.setTasksList(Array.isArray(tasks) ? tasks.map(task => taskToWire(task)) : [taskToWire(tasks)]); request.setQueue(this.name); this.queueing.QueueServiceClient.sendBatch(request, (error, response) => { if (error) { reject(fromGrpcError(error)); - } else { - resolve( - response.getFailedtasksList().map((m) => ({ - task: { - id: m.getTask().getId(), - payload: m.getTask().getPayload().toJavaScript(), - payloadType: m.getTask().getPayloadType(), - }, - message: m.getMessage(), - })) - ); + } + const failedTasks = response.getFailedtasksList().map((m) => ({ + task: { + id: m.getTask().getId(), + payload: m.getTask().getPayload().toJavaScript(), + payloadType: m.getTask().getPayloadType(), + }, + message: m.getMessage(), + })) + if (!Array.isArray(tasks)) { // Single Task returns + if (failedTasks.length > 0) { + reject(new InternalError(failedTasks[0].message)); + } + resolve(); + } else { // Array of Tasks return + resolve(failedTasks) } }); - }); - }; - + }); + } /** * Pop 1 or more queue items from the specified queue up to the depth limit. *