Skip to content

Commit

Permalink
feat(Statements): Add statement processing priority flag (#824)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazy-grizzly authored Sep 20, 2021
2 parents 3243fd4 + 7c824c1 commit e615e66
Show file tree
Hide file tree
Showing 29 changed files with 356 additions and 52 deletions.
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,10 @@ EXPRESS_PORT=8081
#WINSTON_CLOUDWATCH_ACCESS_KEY_ID=
#WINSTON_CLOUDWATCH_SECRET_ACCESS_KEY=
#WINSTON_CLOUDWATCH_REGION=

###############################
# Statement handling priority #
###############################

# Uncomment next line if you want to enable statement handling priority
#ENABLE_QUEUE_PRIORITY=true
4 changes: 4 additions & 0 deletions src/apps/statements/enums/statementProcessingPriority.enum.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum StatementProcessingPriority {
LOW = 'LOW',
MEDIUM = 'MEDIUM',
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Request, Response } from 'express';
import { defaultTo, get } from 'lodash';
import { parse as parseQueryString } from 'query-string';
import streamToString from 'stream-to-string';
import { StatementProcessingPriority } from '../../enums/statementProcessingPriority.enum';
import InvalidContentType from '../../errors/InvalidContentType';
import InvalidMethod from '../../errors/InvalidMethod';
import parseJson from '../../utils/parseJson';
Expand All @@ -13,6 +14,7 @@ import getStatements from '../utils/getStatements';
import getUrlPath from '../utils/getUrlPath';
import storeStatement from '../utils/storeStatement';
import validateVersionHeader from '../utils/validateHeaderVersion';
import { validateStatementProcessingPriority } from '../utils/validateStatementProcessingPriority';
import storeStatements from './storeStatements';

export interface Options {
Expand All @@ -24,15 +26,16 @@ export interface Options {

const checkContentType = (bodyParams: any) => {
const contentType = get(bodyParams, 'Content-Type', 'application/json');

if (!jsonContentTypePattern.test(contentType)) {
throw new InvalidContentType(contentType);
}
};

const getBodyContent = (bodyParams: any) => {
const unparsedBody = get(bodyParams, 'content', '');
const body = parseJson(unparsedBody, ['body', 'content']);
return body;

return parseJson(unparsedBody, ['body', 'content']);
};

const getHeader = (bodyParams: any, req: Request, name: string): string => {
Expand All @@ -41,22 +44,31 @@ const getHeader = (bodyParams: any, req: Request, name: string): string => {

const getBodyParams = async (stream: NodeJS.ReadableStream) => {
const body = await streamToString(stream);
const decodedBody = parseQueryString(body);
return decodedBody;

return parseQueryString(body);
};

export default async ({ config, method, req, res }: Options) => {
checkUnknownParams(req.query, ['method']);

validateStatementProcessingPriority(req.query.priority as string | undefined);
const priority =
(req.query.priority as StatementProcessingPriority) || StatementProcessingPriority.MEDIUM;

if (method === 'POST' || (method === undefined && config.allowUndefinedMethod)) {
const bodyParams = await getBodyParams(req);

checkContentType(bodyParams);

const auth = getHeader(bodyParams, req, 'Authorization');
const client = await getClient(config, auth);
const version = getHeader(bodyParams, req, 'X-Experience-API-Version');

validateVersionHeader(version);

const body = getBodyContent(bodyParams);
return storeStatements({ config, client, body, attachments: [], res });

return storeStatements({ config, client, priority, body, attachments: [], res });
}

if (method === 'GET') {
Expand All @@ -65,22 +77,30 @@ export default async ({ config, method, req, res }: Options) => {
const auth = getHeader(bodyParams, req, 'Authorization');
const client = await getClient(config, auth);
const version = getHeader(bodyParams, req, 'X-Experience-API-Version');

validateVersionHeader(version);

const acceptedLangs = defaultTo<string>(req.header('Accept-Language'), '');
const queryParams = bodyParams;

return getStatements({ config, res, client, queryParams, urlPath, acceptedLangs });
}

if (method === 'PUT') {
const bodyParams = await getBodyParams(req);

checkContentType(bodyParams);

const auth = getHeader(bodyParams, req, 'Authorization');
const client = await getClient(config, auth);
const version = getHeader(bodyParams, req, 'X-Experience-API-Version');

validateVersionHeader(version);

const body = getBodyContent(bodyParams);
const queryParams = bodyParams;
return storeStatement({ config, client, body, attachments: [], queryParams, res });
const statementId = bodyParams.statementId as string | undefined;

return storeStatement({ config, client, body, priority, attachments: [], statementId, res });
}

throw new InvalidMethod(method);
Expand Down
9 changes: 8 additions & 1 deletion src/apps/statements/expressPresenter/postStatements/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Request, Response } from 'express';
import { defaultTo } from 'lodash';
import { parse as parseQueryString } from 'query-string';
import streamToString from 'stream-to-string';
import { StatementProcessingPriority } from '../../enums/statementProcessingPriority.enum';
import InvalidContentType from '../../errors/InvalidContentType';
import parseJson from '../../utils/parseJson';
import Config from '../Config';
Expand All @@ -13,6 +14,7 @@ import {
} from '../utils/contentTypePatterns';
import getClient from '../utils/getClient';
import validateVersionHeader from '../utils/validateHeaderVersion';
import { validateStatementProcessingPriority } from '../utils/validateStatementProcessingPriority';
import alternateRequest from './alternateRequest';
import storeStatements from './storeStatements';
import storeWithAttachments from './storeWithAttachments';
Expand All @@ -34,6 +36,11 @@ export default (config: Config) => {
config,
async (req: Request, res: Response): Promise<void> => {
const method = req.query.method as string | undefined;

validateStatementProcessingPriority(req.query.priority as string | undefined);

const priority =
(req.query.priority as StatementProcessingPriority) || StatementProcessingPriority.MEDIUM;
const contentType = defaultTo(req.header('Content-Type'), '');

if (method === undefined && multipartContentTypePattern.test(contentType)) {
Expand All @@ -46,7 +53,7 @@ export default (config: Config) => {

const body = await parseJsonBody(config, req);
const attachments: any[] = [];
return storeStatements({ config, client, body, attachments, res });
return storeStatements({ config, client, priority, body, attachments, res });
}

if (method !== undefined || alternateContentTypePattern.test(contentType)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import { Response } from 'express';
import { StatusCodes } from 'http-status-codes';
import { isArray } from 'lodash';
import { StatementProcessingPriority } from '../../enums/statementProcessingPriority.enum';
import ClientModel from '../../models/ClientModel';
import { xapiHeaderVersion } from '../../utils/constants';
import Config from '../Config';

export interface Options {
readonly config: Config;
readonly client: ClientModel;
readonly priority: StatementProcessingPriority;
readonly body: any;
readonly attachments: any[];
readonly res: Response;
}

export default async ({ config, client, body, attachments, res }: Options) => {
export default async ({ config, client, priority, body, attachments, res }: Options) => {
const models = isArray(body) ? body : [body];
const ids = await config.service.storeStatements({ models, attachments, client });
const ids = await config.service.storeStatements({ priority, models, attachments, client });
res.setHeader('X-Experience-API-Version', xapiHeaderVersion);
res.status(StatusCodes.OK);
res.json(ids);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { Request, Response } from 'express';
import { defaultTo } from 'lodash';
import { StatementProcessingPriority } from '../../enums/statementProcessingPriority.enum';
import Config from '../Config';
import getClient from '../utils/getClient';
import getMultipartStatements from '../utils/getMultipartStatements';
import { validateStatementProcessingPriority } from '../utils/validateStatementProcessingPriority';
import storeStatements from './storeStatements';

export interface Options {
Expand All @@ -13,6 +15,12 @@ export interface Options {

export default async ({ config, req, res }: Options) => {
const client = await getClient(config, defaultTo(req.header('Authorization'), ''));

validateStatementProcessingPriority(req.query.priority as string | undefined);

const priority =
(req.query.priority as StatementProcessingPriority) || StatementProcessingPriority.MEDIUM;
const { body, attachments } = await getMultipartStatements(req);
return storeStatements({ config, client, body, attachments, res });

return storeStatements({ config, client, priority, body, attachments, res });
};
15 changes: 10 additions & 5 deletions src/apps/statements/expressPresenter/putStatement.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Request, Response } from 'express';
import { defaultTo } from 'lodash';
import streamToString from 'stream-to-string';
import { StatementProcessingPriority } from '../enums/statementProcessingPriority.enum';
import InvalidContentType from '../errors/InvalidContentType';
import AttachmentModel from '../models/AttachmentModel';
import parseJson from '../utils/parseJson';
Expand All @@ -11,26 +12,30 @@ import getClient from './utils/getClient';
import getMultipartStatements from './utils/getMultipartStatements';
import storeStatement from './utils/storeStatement';
import validateVersionHeader from './utils/validateHeaderVersion';
import { validateStatementProcessingPriority } from './utils/validateStatementProcessingPriority';

export default (config: Config) => {
return catchErrors(
config,
async (req: Request, res: Response): Promise<void> => {
const contentType = defaultTo(req.header('Content-Type'), '');
const client = await getClient(config, defaultTo(req.header('Authorization'), ''));
validateStatementProcessingPriority(req.query.priority as string | undefined);
validateVersionHeader(req.header('X-Experience-API-Version'));

const queryParams = req.query;
const contentType = defaultTo(req.header('Content-Type'), '');
const client = await getClient(config, defaultTo(req.header('Authorization'), ''));
const priority =
(req.query.priority as StatementProcessingPriority) || StatementProcessingPriority.MEDIUM;
const statementId = req.query.statementId as string;

if (multipartContentTypePattern.test(contentType)) {
const { body, attachments } = await getMultipartStatements(req);
return storeStatement({ config, body, attachments, client, queryParams, res });
return storeStatement({ config, priority, body, attachments, client, statementId, res });
}

if (jsonContentTypePattern.test(contentType)) {
const body = parseJson(await streamToString(req), ['body']);
const attachments: AttachmentModel[] = [];
return storeStatement({ config, body, attachments, client, queryParams, res });
return storeStatement({ config, priority, body, attachments, client, statementId, res });
}

throw new InvalidContentType(contentType);
Expand Down
22 changes: 18 additions & 4 deletions src/apps/statements/expressPresenter/utils/storeStatement.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Response } from 'express';
import { StatusCodes } from 'http-status-codes';
import { StatementProcessingPriority } from '../../enums/statementProcessingPriority.enum';
import MissingStatementId from '../../errors/MissingStatementId';
import UnequalStatementId from '../../errors/UnequalStatementId';
import AttachmentModel from '../../models/AttachmentModel';
Expand All @@ -9,29 +10,42 @@ import Config from '../Config';

export interface Options {
readonly config: Config;
readonly priority: StatementProcessingPriority;
readonly body: any;
readonly attachments: AttachmentModel[];
readonly client: ClientModel;
readonly queryParams: any;
readonly statementId?: string;
readonly res: Response;
}

export default async ({ config, body, attachments, client, queryParams, res }: Options) => {
const statementId = queryParams.statementId;
export default async ({
config,
priority,
body,
attachments,
client,
statementId,
res,
}: Options) => {
if (statementId === undefined) {
throw new MissingStatementId();
}

if (body.id !== undefined && body.id !== statementId) {
throw new UnequalStatementId(statementId);
}

const models = [
{
...body,
id: statementId, // Ensures the id is set to the given id.
},
];
await config.service.storeStatements({ models, attachments, client });

await config.service.storeStatements({ priority, models, attachments, client });

res.setHeader('X-Experience-API-Version', xapiHeaderVersion);
res.status(StatusCodes.NO_CONTENT);

res.send();
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { createWarning, Warnings } from 'rulr';

import { StatementProcessingPriority } from '../../enums/statementProcessingPriority.enum';

export const validateStatementProcessingPriority = (
statementProcessingPriority: string | undefined,
): void | Warnings[] => {
if (
statementProcessingPriority &&
!Object.values(StatementProcessingPriority).includes(
statementProcessingPriority as StatementProcessingPriority,
)
) {
const warnings = [createWarning(statementProcessingPriority, ['query', 'priority'])];
throw new Warnings({}, ['query'], warnings);
}
};
2 changes: 2 additions & 0 deletions src/apps/statements/models/UnstoredStatementModel.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { StatementProcessingPriority } from '../enums/statementProcessingPriority.enum';
import Statement from './Statement';

export interface Ref {
Expand All @@ -9,6 +10,7 @@ interface UnstoredStatementModel {
readonly organisation: string;
readonly client: string;
readonly lrs_id: string;
readonly priority: StatementProcessingPriority;
readonly person: string | null;
readonly active: boolean;
readonly voided: boolean;
Expand Down
19 changes: 15 additions & 4 deletions src/apps/statements/repo/eventsRepo/clearRepo/redis.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
import { StatementProcessingPriority } from '../../../enums/statementProcessingPriority.enum';
import { EVENT_NAME } from '../utils/constants';
import { getPrefixWithProcessingPriority } from '../utils/getPrefixWithProcessingPriority';
import FacadeConfig from '../utils/redisEvents/FacadeConfig';

const EVENT_NAME = 'statement.new';

export default (config: FacadeConfig) => {
return async (): Promise<void> => {
const client = await config.client();
const listName = `${config.prefix}:${EVENT_NAME}`;
await client.del(listName);

await Promise.all(
Object.values(StatementProcessingPriority).map((statementProcessingPriority) => {
const listName = `${getPrefixWithProcessingPriority(
config.prefix,
statementProcessingPriority,
config.isQueuePriorityEnabled,
)}:${EVENT_NAME}`;

return client.del(listName);
}),
);
};
};
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import Member from 'jscommons/dist/utils/Member';
import { StatementProcessingPriority } from '../../../enums/statementProcessingPriority.enum';

export interface Opts {
readonly statementProperties: string[];
readonly priority: StatementProcessingPriority;
}

type Signature = Member<Opts, void>;
Expand Down
18 changes: 12 additions & 6 deletions src/apps/statements/repo/eventsRepo/emitNewStatements/redis.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import { CHANNEL_NAME, EVENT_NAME } from '../utils/constants';
import { getPrefixWithProcessingPriority } from '../utils/getPrefixWithProcessingPriority';
import FacadeConfig from '../utils/redisEvents/FacadeConfig';
import Signature from './Signature';

const EVENT_NAME = 'statement.new';
const CHANNEL_NAME = 'statement.notify';

export default (config: FacadeConfig): Signature => {
return async ({ statementProperties }) => {
return async ({ statementProperties, priority }) => {
const client = await config.client();
const listName = `${config.prefix}:${EVENT_NAME}`;
const channelName = `${config.prefix}:${CHANNEL_NAME}`;

const prefixWithPriority = getPrefixWithProcessingPriority(
config.prefix,
priority,
config.isQueuePriorityEnabled,
);
const listName = `${prefixWithPriority}:${EVENT_NAME}`;
const channelName = `${prefixWithPriority}:${CHANNEL_NAME}`;

await client.rpush(listName, ...statementProperties);
client.publish(channelName, '');
};
Expand Down
Loading

0 comments on commit e615e66

Please sign in to comment.