Skip to content

Commit

Permalink
ADD initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
vinayak25 committed Oct 30, 2020
0 parents commit 6142bf8
Show file tree
Hide file tree
Showing 26 changed files with 1,052 additions and 0 deletions.
19 changes: 19 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# dependencies
/node_modules

# IDE
/.idea
/.awcache
/.vscode

# misc
npm-debug.log
.DS_Store

# tests
/test
/coverage
/.nyc_output

# dist
dist
18 changes: 18 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# source
lib
tests
index.ts
package-lock.json
tslint.json
tsconfig.json
.prettierrc

# github
.github
CONTRIBUTING.MD

# misc
.commitlintrc.json
.release-it.json
.eslintignore
.eslintrc.js
3 changes: 3 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const QUEUE_OPTIONS = '__QUEUE_OPTIONS__';
export const JOB_NAME = '__JOB_NAME__';
export const JOB_OPTIONS = '__JOB_OPTIONS__';
1 change: 1 addition & 0 deletions lib/core/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './payloadBuilder';
36 changes: 36 additions & 0 deletions lib/core/payloadBuilder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { InternalMessage, JobOptions, Message } from "../interfaces";
import { QueueMetadata } from "../metadata";

type Complete<T> = {
[P in keyof Required<T>]: Pick<T, P> extends Required<Pick<T, P>>
? T[P]
: T[P] | undefined;
};

export class PayloadBuilder {
static build(
message: Message,
jobOptions: JobOptions
): Complete<InternalMessage> {
const defaultOptions = QueueMetadata.getDefaultOptions();
const payload = {
attemptCount: 0,
...defaultOptions,
queue: undefined,
...jobOptions,
...message,
} as Complete<InternalMessage>;

payload.connection = payload.connection || defaultOptions.connection;

if (!payload.queue) {
const config = QueueMetadata.getData();
payload.queue =
payload.connection != undefined
? config.connections[payload.connection].queue
: undefined;
}

return payload;
}
}
11 changes: 11 additions & 0 deletions lib/decorators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import 'reflect-metadata';
import { JOB_NAME, JOB_OPTIONS } from './constants';
import { JobOptions } from './interfaces';

export function Job(job: string, options?: JobOptions) {
options = options || {};
return function(target: Record<string, any>, propertyKey: string) {
Reflect.defineMetadata(JOB_NAME, job, target, propertyKey);
Reflect.defineMetadata(JOB_OPTIONS, options, target, propertyKey);
};
}
53 changes: 53 additions & 0 deletions lib/drivers/sqs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { InternalMessage, QueueDriver, SqsBrokerOption } from "../interfaces";
import AWS = require("aws-sdk");
import { SqsJob } from "../jobs/sqsJob";

export class SqsQueueDriver implements QueueDriver {
private client: AWS.SQS;
private queueUrl: string;

constructor(private options: SqsBrokerOption) {
AWS.config.update({ region: options.region });
const credential = new AWS.SharedIniFileCredentials({
profile: options.profile,
});
AWS.config.credentials = credential;
this.client = new AWS.SQS({ apiVersion: options.apiVersion });
this.queueUrl = options.prefix + "/" + options.queue;
}

async push(message: string, rawPayload: InternalMessage): Promise<void> {
const params = {
DelaySeconds: rawPayload.delay,
MessageBody: message,
QueueUrl: this.options.prefix + "/" + rawPayload.queue,
};

await this.client.sendMessage(params).promise().then();
return;
}

async pull(options: Record<string, any>): Promise<SqsJob | null> {
const params = {
MaxNumberOfMessages: 1,
MessageAttributeNames: ["All"],
QueueUrl: this.options.prefix + "/" + options.queue,
VisibilityTimeout: 30,
WaitTimeSeconds: 0,
};
const response = await this.client.receiveMessage(params).promise();
const message = response.Messages ? response.Messages[0] : null;
return message ? new SqsJob(message) : null;
}

async remove(job: SqsJob, options: Record<string, any>): Promise<void> {
const params = {
QueueUrl: this.options.prefix + "/" + options.queue,
ReceiptHandle: job.data.ReceiptHandle,
};

await this.client.deleteMessage(params).promise();

return;
}
}
42 changes: 42 additions & 0 deletions lib/explorer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { DiscoveryService, MetadataScanner } from '@nestjs/core';
import { JOB_NAME, JOB_OPTIONS } from './constants';
import { QueueMetadata } from './metadata';

@Injectable()
export class QueueExplorer implements OnModuleInit {
constructor(
private readonly discovery: DiscoveryService,
private readonly metadataScanner: MetadataScanner,
) {}

onModuleInit() {
const wrappers = this.discovery.getProviders();
wrappers.forEach(w => {
const { instance } = w;
if (
!instance ||
typeof instance === 'string' ||
!Object.getPrototypeOf(instance)
) {
return;
}
this.metadataScanner.scanFromPrototype(
instance,
Object.getPrototypeOf(instance),
(key: string) => this.lookupJobs(instance, key),
);
});
}

lookupJobs(instance: Record<string, Function>, key: string) {
const methodRef = instance[key];
const hasJobMeta = Reflect.hasMetadata(JOB_NAME, instance, key);
if (!hasJobMeta) return;
const jobName = Reflect.getMetadata(JOB_NAME, instance, key);
QueueMetadata.addJob(jobName, {
options: Reflect.getMetadata(JOB_OPTIONS, instance, key),
target: methodRef.bind(instance),
});
}
}
7 changes: 7 additions & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export * from './module';
export * from './interfaces';
export * from './service';
export * from './queue';
export * from './decorators';
export * from './worker';
export * from './listener';
10 changes: 10 additions & 0 deletions lib/interfaces/driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { DriverJob } from "../jobs";
import { Message } from "./Message";

export interface QueueDriver {
push(message: string, rawMessage: Message): Promise<void>;

pull(options: Record<string, any>): Promise<DriverJob | null>;

remove(job: DriverJob, options: Record<string, any>): Promise<void>;
}
4 changes: 4 additions & 0 deletions lib/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './options';
export * from './message';
export * from './driver';
export * from './job';
1 change: 1 addition & 0 deletions lib/interfaces/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class BaseJob {}
18 changes: 18 additions & 0 deletions lib/interfaces/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export type Payload = Record<string, any> | string | number;

export interface JobOptions {
delay?: number;
tries?: number;
queue?: string;
timeout?: number;
connection?: string;
}

export interface Message extends JobOptions {
job: string;
data: Payload | Payload[];
}

export interface InternalMessage extends Message {
attemptCount: number;
}
38 changes: 38 additions & 0 deletions lib/interfaces/options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { ModuleMetadata, Type } from "@nestjs/common";

export interface SqsBrokerOption {
driver: "sqs";
profile: string;
apiVersion: string;
prefix: string;
queue: string;
suffix?: string;
region: string;
}

export interface QueueOptions {
isGlobal?: boolean;
default: string;
connections: {
[key: string]: SqsBrokerOption;
};
}

export interface QueueAsyncOptionsFactory {
createQueueOptions(): Promise<QueueOptions> | QueueOptions;
}

export interface QueueAsyncOptions extends Pick<ModuleMetadata, "imports"> {
name?: string;
isGlobal: boolean;
useExisting?: Type<QueueOptions>;
useClass?: Type<QueueAsyncOptionsFactory>;
useFactory?: (...args: any[]) => Promise<QueueOptions> | QueueOptions;
inject?: any[];
}

export interface ListenerOptions {
sleep?: number;
connection?: string;
queue?: string;
}
5 changes: 5 additions & 0 deletions lib/jobs/driverJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export abstract class DriverJob {
constructor(public data: Record<string, any>) {}

public abstract getMessage(): string;
}
2 changes: 2 additions & 0 deletions lib/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './sqsJob';
export * from './driverJob';
7 changes: 7 additions & 0 deletions lib/jobs/sqsJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { DriverJob } from './driverJob';

export class SqsJob extends DriverJob {
public getMessage(): string {
return this.data.Body;
}
}
50 changes: 50 additions & 0 deletions lib/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { ListenerOptions, QueueDriver } from "./interfaces";
import { DriverJob } from "./jobs";
import { QueueMetadata } from "./metadata";
import { QueueService } from "./service";
import { QueueWorker } from "./worker";

export class QueueListener {
private options: ListenerOptions;

constructor(options?: ListenerOptions) {
const defaultOptions = QueueMetadata.getDefaultOptions();
this.options = options || {};
this.options = {
...defaultOptions,
queue: undefined,
...this.options,
};

if (!this.options.queue) {
const data = QueueMetadata.getData();
this.options["queue"] =
data.connections[
this.options.connection || defaultOptions.connection
].queue;
}
}

static init(options?: ListenerOptions): QueueListener {
return new QueueListener(options);
}

private async poll(connection: QueueDriver): Promise<DriverJob | null> {
const job = await connection.pull({ queue: this.options.queue });
return job;
}

async run() {
const connection = QueueService.getConnection(this.options.connection);
const worker = new QueueWorker(this.options, connection);

while (1) {
const job = await this.poll(connection);
if (job) {
await worker.run(job);
} else {
await new Promise((resolve) => setTimeout(resolve, this.options.sleep));
}
}
}
}
43 changes: 43 additions & 0 deletions lib/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { Inject, Injectable } from "@nestjs/common";
import { QUEUE_OPTIONS } from "./constants";
import { QueueOptions, JobOptions, InternalMessage } from "./interfaces";

interface JobTarget {
options: JobOptions;
target: Function;
}

@Injectable()
export class QueueMetadata {
private static data: QueueOptions;
private static defaultOptions: Record<string, string | number | undefined>;
private static store: Record<string, any> = { jobs: {} };

constructor(@Inject(QUEUE_OPTIONS) data: QueueOptions) {
QueueMetadata.data = data;
QueueMetadata.defaultOptions = {
connection: data.default,
queue: data.connections[data.default].queue,
delay: 10,
tries: 5,
timeout: 30,
sleep: 5000,
};
}

static getDefaultOptions(): Record<string, any> {
return QueueMetadata.defaultOptions;
}

static getData(): QueueOptions {
return QueueMetadata.data;
}

static addJob(jobName: string, target: JobTarget): void {
QueueMetadata.store.jobs[jobName] = target;
}

static getJob(jobName: string): JobTarget {
return QueueMetadata.store.jobs[jobName];
}
}
Loading

0 comments on commit 6142bf8

Please sign in to comment.