From b8675da0afab69ba0aa81b42d517723343404887 Mon Sep 17 00:00:00 2001 From: Leonardo Rivera Date: Fri, 17 Jan 2025 17:14:09 -0500 Subject: [PATCH] bulk operation --- packages/common/src/types/index.ts | 9 +- packages/common/src/types/service.ts | 72 +++++++---- packages/indexer-client/src/client/index.ts | 13 +- .../indexer-client/src/client/v7/client.ts | 13 +- .../src/client/v7/operations.ts | 57 +++++++-- .../indexer-client/src/client/v8/client.ts | 13 +- .../src/client/v8/operations.ts | 53 +++++++- .../indexer-client/test/createClient.spec.ts | 20 +-- .../test/integration/bulk/create.spec.ts | 95 +++++++++++++++ .../test/integration/bulk/delete.spec.ts | 113 +++++++++++++++++ .../test/integration/bulk/update.spec.ts | 115 ++++++++++++++++++ .../test/integration/bulk/upsert.spec.ts | 95 +++++++++++++++ ...IndexedData.spec.ts => deleteData.spec.ts} | 43 ++++--- .../test/integration/index.spec.ts | 34 +++++- packages/maestro-provider/src/api/api.ts | 8 +- 15 files changed, 677 insertions(+), 76 deletions(-) create mode 100644 packages/indexer-client/test/integration/bulk/create.spec.ts create mode 100644 packages/indexer-client/test/integration/bulk/delete.spec.ts create mode 100644 packages/indexer-client/test/integration/bulk/update.spec.ts create mode 100644 packages/indexer-client/test/integration/bulk/upsert.spec.ts rename packages/indexer-client/test/integration/{bulkUpsertIndexedData.spec.ts => deleteData.spec.ts} (69%) diff --git a/packages/common/src/types/index.ts b/packages/common/src/types/index.ts index efebf47..2089181 100644 --- a/packages/common/src/types/index.ts +++ b/packages/common/src/types/index.ts @@ -9,4 +9,11 @@ export { export { DataRecordNested, DataRecordValue, FailureData, IndexResult } from './dataRecord.js'; export { ConsoleLike, LoggerConfig } from './logger.js'; export { Repository, RepositoryIndexingOperations } from './repository.js'; -export { ElasticsearchService } from './service.js'; +export { + BulkAction, + CreateBulkRequest, + DeleteBulkRequest, + ElasticsearchService, + UpdateBulkRequest, + UpsertBulkRequest, +} from './service.js'; diff --git a/packages/common/src/types/service.ts b/packages/common/src/types/service.ts index 807bc74..a297945 100644 --- a/packages/common/src/types/service.ts +++ b/packages/common/src/types/service.ts @@ -1,9 +1,55 @@ import type { DataRecordNested, IndexResult } from './dataRecord.js'; +export const BulkAction = { + DELETE: 'delete', + UPSERT: 'upsert', + CREATE: 'create', + UPDATE: 'update', +} as const; + +export type CreateBulkRequest = { + action: typeof BulkAction.CREATE; + dataSet: DataRecordNested; +}; + +export type DeleteBulkRequest = { + action: typeof BulkAction.DELETE; + id: string; +}; + +export type UpdateBulkRequest = { + action: typeof BulkAction.UPDATE; + dataSet: DataRecordNested; +}; + +export type UpsertBulkRequest = { + action: typeof BulkAction.UPSERT; + dataSet: DataRecordNested; +}; + /** * Interface defining the contract for Elasticsearch service operations. */ export interface ElasticsearchService { + /** + * Indexes data into a specified Elasticsearch index. + * + * @param index - The name of the index where the data will be stored. + * @param data - The data to be indexed. + * @returns A promise that resolves to the result of the indexing operation. + */ + addData(index: string, data: DataRecordNested): Promise; + + /** + * Performs a bulk upsert operation to index or update multiple documents in the specified index. + * @param index The name of the index where the documents will be upserted + * @param data An array of data records to be upserted. + */ + bulk( + index: string, + request: (CreateBulkRequest | UpdateBulkRequest | DeleteBulkRequest | UpsertBulkRequest)[], + ): Promise; + /** * Creates an index in Elasticsearch. * @@ -13,13 +59,13 @@ export interface ElasticsearchService { createIndex(index: string): Promise; /** - * Indexes data into a specified Elasticsearch index. + * Deletes a document from a specified Elasticsearch index. * - * @param index - The name of the index where the data will be stored. - * @param data - The data to be indexed. - * @returns A promise that resolves to the result of the indexing operation. + * @param index - The name of the index from which the document will be deleted. + * @param id - The ID of the document to delete. + * @returns A promise that resolves to the result of the deletion operation. */ - addData(index: string, data: DataRecordNested): Promise; + deleteData(index: string, id: string): Promise; /** * Checks the availability of the Elasticsearch service. @@ -37,20 +83,4 @@ export interface ElasticsearchService { * @returns A promise that resolves to the result of the update operation. */ updateData(index: string, id: string, data: DataRecordNested): Promise; - - /** - * Deletes a document from a specified Elasticsearch index. - * - * @param index - The name of the index from which the document will be deleted. - * @param id - The ID of the document to delete. - * @returns A promise that resolves to the result of the deletion operation. - */ - deleteData(index: string, id: string): Promise; - - /** - * Performs a bulk upsert operation to index or update multiple documents in the specified index. - * @param index The name of the index where the documents will be upserted - * @param data An array of data records to be upserted. - */ - bulkUpsert(index: string, data: DataRecordNested[]): Promise; } diff --git a/packages/indexer-client/src/client/index.ts b/packages/indexer-client/src/client/index.ts index 5167d41..b78f1e8 100644 --- a/packages/indexer-client/src/client/index.ts +++ b/packages/indexer-client/src/client/index.ts @@ -1,4 +1,12 @@ -import { type DataRecordNested, ElasticSearchConfig, ElasticsearchService } from '@overture-stack/maestro-common'; +import { + type CreateBulkRequest, + type DataRecordNested, + type DeleteBulkRequest, + ElasticSearchConfig, + ElasticsearchService, + type UpdateBulkRequest, + type UpsertBulkRequest, +} from '@overture-stack/maestro-common'; import { es7 } from './v7/client.js'; import { es8 } from './v8/client.js'; @@ -25,7 +33,8 @@ export const clientProvider = (elasticSearchConfig: ElasticSearchConfig): Elasti return { addData: (index: string, data: DataRecordNested) => service.addData(index, data), - bulkUpsert: (index: string, data: DataRecordNested[]) => service.bulkUpsert(index, data), + bulk: (index: string, request: (CreateBulkRequest | UpdateBulkRequest | DeleteBulkRequest | UpsertBulkRequest)[]) => + service.bulk(index, request), createIndex: (index: string) => service.createIndex(index), deleteData: (index: string, id: string) => service.deleteData(index, id), ping: () => service.ping(), diff --git a/packages/indexer-client/src/client/v7/client.ts b/packages/indexer-client/src/client/v7/client.ts index 12a77b8..a2a2353 100644 --- a/packages/indexer-client/src/client/v7/client.ts +++ b/packages/indexer-client/src/client/v7/client.ts @@ -1,14 +1,18 @@ import { Client } from 'es7'; import type { + CreateBulkRequest, DataRecordNested, + DeleteBulkRequest, ElasticSearchConfig, ElasticsearchService, IndexResult, + UpdateBulkRequest, + UpsertBulkRequest, } from '@overture-stack/maestro-common'; import { getAuth } from '../../common/config.js'; -import { bulkUpsert, createIndexIfNotExists, deleteData, indexData, ping, updateData } from './operations.js'; +import { bulk, createIndexIfNotExists, deleteData, indexData, ping, updateData } from './operations.js'; /** * Creates an instance of the Elasticsearch service for version 7. @@ -40,8 +44,11 @@ export const es7 = (config: ElasticSearchConfig): ElasticsearchService => { return indexData(client, index, data); }, - async bulkUpsert(index: string, data: DataRecordNested[]): Promise { - return bulkUpsert(client, index, data); + async bulk( + index: string, + request: (CreateBulkRequest | UpdateBulkRequest | DeleteBulkRequest | UpsertBulkRequest)[], + ): Promise { + return bulk(client, index, request); }, async createIndex(index: string): Promise { diff --git a/packages/indexer-client/src/client/v7/operations.ts b/packages/indexer-client/src/client/v7/operations.ts index 693f884..82617c3 100644 --- a/packages/indexer-client/src/client/v7/operations.ts +++ b/packages/indexer-client/src/client/v7/operations.ts @@ -1,23 +1,60 @@ import type { Client } from 'es7'; import type { BulkOperationType, BulkResponseItem } from 'es7/api/types'; -import { type DataRecordNested, type FailureData, IndexResult, logger } from '@overture-stack/maestro-common'; +import { + BulkAction, + type CreateBulkRequest, + type DataRecordNested, + type DeleteBulkRequest, + type FailureData, + IndexResult, + logger, + type UpdateBulkRequest, + type UpsertBulkRequest, +} from '@overture-stack/maestro-common'; /** - * Indexes the specified document. If the document exists, replaces the document and increments the version. + * Performs bulk operations (e.g., delete, upsert, create, update) on a specified index in Elasticsearch. * - * @param client An instance of the Elasticsearch `Client` used to perform the indexing operation - * @param index The name of the Elasticsearch index to create - * @param dataSet The actual data to be stored in the document - * @returns + * @param client The Elasticsearch client used to perform the bulk operations + * @param index The name of the index on which to perform the bulk operations + * @param bulkRequest An array of bulk request objects representing the actions to be performed. + * The array can include upsert, delete, create, and update actions. Each action contains specific data + * that will be used for the corresponding operation. + * + * @returns A Promise that resolves to an `IndexResult` object. The result includes: + * - `indexName`: The name of the index on which the operations were performed. + * - `successful`: A boolean indicating whether all operations were successful. + * - `failureData`: An object that maps the index of failed operations to the error details. If no operations fail, this object is empty. */ -export const bulkUpsert = async (client: Client, index: string, dataSet: DataRecordNested[]) => { +export const bulk = async ( + client: Client, + index: string, + bulkRequest: (UpsertBulkRequest | DeleteBulkRequest | CreateBulkRequest | UpdateBulkRequest)[], +): Promise => { try { - const body = dataSet.flatMap((doc) => [{ index: { _index: index, _id: doc?.['id'] } }, doc]); + const body = bulkRequest.flatMap((val) => { + switch (val.action) { + case BulkAction.DELETE: + return [{ delete: { _index: index, _id: val.id } }]; + case BulkAction.UPSERT: { + const doc = val.dataSet; + return [{ index: { _index: index, _id: doc?.['id'] } }, doc]; + } + case BulkAction.CREATE: { + const doc = val.dataSet; + return [{ create: { _index: index, _id: doc?.['id'] } }, doc]; + } + case BulkAction.UPDATE: { + const doc = val.dataSet; + return [{ update: { _index: index, _id: doc?.['id'] } }, { doc }]; + } + } + }); const response = await client.bulk({ refresh: true, body }); - logger.debug(`Bulk upsert in index:'${index}'`, `# of documents:'${dataSet.length}'`, response.statusCode); + logger.debug(`Bulk actions in index:'${index}'`, `# of documents:'${bulkRequest.length}'`, response.statusCode); let successful = false; const failureData: FailureData = {}; @@ -45,7 +82,7 @@ export const bulkUpsert = async (client: Client, index: string, dataSet: DataRec } catch (error) { let errorMessage = JSON.stringify(error); - logger.error(`Error update doc: ${errorMessage}`); + logger.error(`Error bulk action: ${errorMessage}`); if (typeof error === 'object' && error && 'name' in error && typeof error.name === 'string') { errorMessage = error.name; diff --git a/packages/indexer-client/src/client/v8/client.ts b/packages/indexer-client/src/client/v8/client.ts index 004a0ed..e941049 100644 --- a/packages/indexer-client/src/client/v8/client.ts +++ b/packages/indexer-client/src/client/v8/client.ts @@ -1,14 +1,18 @@ import { Client } from 'es8'; import type { + CreateBulkRequest, DataRecordNested, + DeleteBulkRequest, ElasticSearchConfig, ElasticsearchService, IndexResult, + UpdateBulkRequest, + UpsertBulkRequest, } from '@overture-stack/maestro-common'; import { getAuth } from '../../common/config.js'; -import { bulkUpsert, createIndexIfNotExists, deleteData, indexData, ping, updateData } from './operations.js'; +import { bulk, createIndexIfNotExists, deleteData, indexData, ping, updateData } from './operations.js'; /** * Creates an instance of the Elasticsearch service for version 8. @@ -39,8 +43,11 @@ export const es8 = (config: ElasticSearchConfig): ElasticsearchService => { async addData(index: string, data: DataRecordNested): Promise { return indexData(client, index, data); }, - async bulkUpsert(index: string, data: DataRecordNested[]): Promise { - return bulkUpsert(client, index, data); + async bulk( + index: string, + request: (CreateBulkRequest | UpdateBulkRequest | DeleteBulkRequest | UpsertBulkRequest)[], + ): Promise { + return bulk(client, index, request); }, async createIndex(index: string): Promise { diff --git a/packages/indexer-client/src/client/v8/operations.ts b/packages/indexer-client/src/client/v8/operations.ts index 7c5f624..3d16abc 100644 --- a/packages/indexer-client/src/client/v8/operations.ts +++ b/packages/indexer-client/src/client/v8/operations.ts @@ -1,15 +1,60 @@ import type { Client } from 'es8'; import type { BulkOperationType, BulkResponseItem } from 'es8/lib/api/types.js'; -import { type DataRecordNested, FailureData, IndexResult, logger } from '@overture-stack/maestro-common'; +import { + BulkAction, + type CreateBulkRequest, + type DataRecordNested, + type DeleteBulkRequest, + FailureData, + IndexResult, + logger, + type UpdateBulkRequest, + type UpsertBulkRequest, +} from '@overture-stack/maestro-common'; -export const bulkUpsert = async (client: Client, index: string, dataSet: DataRecordNested[]) => { +/** + * Performs bulk operations (e.g., delete, upsert, create, update) on a specified index in Elasticsearch. + * + * @param client The Elasticsearch client used to perform the bulk operations + * @param index The name of the index on which to perform the bulk operations + * @param bulkRequest An array of bulk request objects representing the actions to be performed. + * The array can include upsert, delete, create, and update actions. Each action contains specific data + * that will be used for the corresponding operation. + * + * @returns A Promise that resolves to an `IndexResult` object. The result includes: + * - `indexName`: The name of the index on which the operations were performed. + * - `successful`: A boolean indicating whether all operations were successful. + * - `failureData`: An object that maps the index of failed operations to the error details. If no operations fail, this object is empty. + */ +export const bulk = async ( + client: Client, + index: string, + bulkRequest: (UpsertBulkRequest | DeleteBulkRequest | CreateBulkRequest | UpdateBulkRequest)[], +): Promise => { try { - const body = dataSet.flatMap((doc) => [{ index: { _index: index, _id: doc?.['id'] } }, doc]); + const body = bulkRequest.flatMap((val) => { + switch (val.action) { + case BulkAction.DELETE: + return [{ delete: { _index: index, _id: val.id } }]; + case BulkAction.UPSERT: { + const doc = val.dataSet; + return [{ index: { _index: index, _id: doc?.['id'] } }, doc]; + } + case BulkAction.CREATE: { + const doc = val.dataSet; + return [{ create: { _index: index, _id: doc?.['id'] } }, doc]; + } + case BulkAction.UPDATE: { + const doc = val.dataSet; + return [{ update: { _index: index, _id: doc?.['id'] } }, { doc }]; + } + } + }); const response = await client.bulk({ refresh: true, body }); - logger.debug(`Bulk upsert in index:'${index}'`, `# of documents: ${response.items.length}`); + logger.debug(`Bulk action in index:'${index}'`, `# of documents: ${response.items.length}`); let successful = false; const failureData: FailureData = {}; diff --git a/packages/indexer-client/test/createClient.spec.ts b/packages/indexer-client/test/createClient.spec.ts index 8d1557f..d5a4ec1 100644 --- a/packages/indexer-client/test/createClient.spec.ts +++ b/packages/indexer-client/test/createClient.spec.ts @@ -10,21 +10,23 @@ describe('Initialize Indexer', () => { const nodeUrl = 'http://myserver:9200'; const providerConfig = { nodes: nodeUrl, basicAuth: { enabled: false }, version: 7 }; const provider = clientProvider(providerConfig); - expect(provider).to.have.property('createIndex'); expect(provider).to.have.property('addData'); - expect(provider).to.have.property('updateData'); + expect(provider).to.have.property('bulk'); + expect(provider).to.have.property('createIndex'); expect(provider).to.have.property('deleteData'); expect(provider).to.have.property('ping'); + expect(provider).to.have.property('updateData'); }); it('should create a provider V8 with indexing functions', () => { const nodeUrl = 'http://myserver:9200'; const providerConfig = { nodes: nodeUrl, basicAuth: { enabled: false }, version: 8 }; const provider = clientProvider(providerConfig); - expect(provider).to.have.property('createIndex'); expect(provider).to.have.property('addData'); - expect(provider).to.have.property('updateData'); + expect(provider).to.have.property('bulk'); + expect(provider).to.have.property('createIndex'); expect(provider).to.have.property('deleteData'); expect(provider).to.have.property('ping'); + expect(provider).to.have.property('updateData'); }); it('should throw an error if configuration version is incorrect', () => { try { @@ -41,21 +43,23 @@ describe('Initialize Indexer', () => { const nodeUrl = 'http://myserver:9200'; const client = es7({ nodes: nodeUrl, basicAuth: { enabled: false }, version: 7 }); - expect(client).to.have.property('createIndex'); expect(client).to.have.property('addData'); - expect(client).to.have.property('updateData'); + expect(client).to.have.property('bulk'); + expect(client).to.have.property('createIndex'); expect(client).to.have.property('deleteData'); expect(client).to.have.property('ping'); + expect(client).to.have.property('updateData'); }); it('should create a client V8 with indexing functions', () => { const nodeUrl = 'http://myserver:9200'; const client = es8({ nodes: nodeUrl, basicAuth: { enabled: false }, version: 8 }); - expect(client).to.have.property('createIndex'); expect(client).to.have.property('addData'); - expect(client).to.have.property('updateData'); + expect(client).to.have.property('bulk'); + expect(client).to.have.property('createIndex'); expect(client).to.have.property('deleteData'); expect(client).to.have.property('ping'); + expect(client).to.have.property('updateData'); }); it('should throw an error if client V7 configuration version is incorrect', () => { const nodeUrl = 'http://myserver:9200'; diff --git a/packages/indexer-client/test/integration/bulk/create.spec.ts b/packages/indexer-client/test/integration/bulk/create.spec.ts new file mode 100644 index 0000000..3cf4f93 --- /dev/null +++ b/packages/indexer-client/test/integration/bulk/create.spec.ts @@ -0,0 +1,95 @@ +import { ElasticsearchContainer, StartedElasticsearchContainer } from '@testcontainers/elasticsearch'; +import { expect } from 'chai'; + +import { + BulkAction, + type CreateBulkRequest, + type DataRecordNested, + type ElasticsearchService, +} from '@overture-stack/maestro-common'; + +import { es7 } from '../../../src/client/v7/client.js'; +import { es8 } from '../../../src/client/v8/client.js'; + +export default function suite() { + let container: StartedElasticsearchContainer; + let client: ElasticsearchService; + + before(async () => { + // Start an Elasticsearch container + container = await new ElasticsearchContainer(this.ctx.dockerImage) + .withEnvironment({ + 'xpack.security.enabled': 'false', + }) + .start(); + + // Get the connection details for the running container + const esHost = container.getHttpUrl(); + const clientVersion = this.ctx.clientVersion; + + // Initialize our client wrapper + if (clientVersion === 7) { + client = es7({ nodes: esHost, version: 7, basicAuth: { enabled: false } }); + } else if (clientVersion === 8) { + client = es8({ nodes: esHost, version: 8, basicAuth: { enabled: false } }); + } + + // Wait for Elasticsearch to be ready + await new Promise((resolve) => setTimeout(resolve, 5000)); // Add a delay to ensure Elasticsearch is ready + }); + + after(async () => { + // Stop the container after tests are complete + await container.stop(); + }); + + it('should return successful true when bulk creating data', async () => { + const indexName = 'test-index'; + + // Create Data + const records: DataRecordNested[] = [ + { id: 1, name: 'value1' }, + { id: 2, name: 'value2' }, + { id: 3, name: 'value3' }, + ]; + const bulkRequest: CreateBulkRequest[] = records.map((data) => ({ action: BulkAction.CREATE, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + }); + + it('should return successful true when doc id does not exists', async () => { + const indexName = 'test-index'; + + // Create Data + const records: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const bulkRequest: CreateBulkRequest[] = records.map((data) => ({ action: BulkAction.CREATE, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + }); + + it('should return successful false when a ConnectionError is thrown', async () => { + const indexName = 'test-index'; + + // Setting an invalid node url to throw a Connection Error + if (this.ctx.clientVersion === 7) { + client = es7({ nodes: 'http://unknown', version: 7, basicAuth: { enabled: false } }); + } else if (this.ctx.clientVersion === 8) { + client = es8({ nodes: 'http://unknown', version: 8, basicAuth: { enabled: false } }); + } + + const records: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const bulkRequest: CreateBulkRequest[] = records.map((data) => ({ action: BulkAction.CREATE, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(false); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(1); + expect(result.failureData).to.eql({ error: ['ConnectionError'] }); + }); +} diff --git a/packages/indexer-client/test/integration/bulk/delete.spec.ts b/packages/indexer-client/test/integration/bulk/delete.spec.ts new file mode 100644 index 0000000..79621ee --- /dev/null +++ b/packages/indexer-client/test/integration/bulk/delete.spec.ts @@ -0,0 +1,113 @@ +import { ElasticsearchContainer, StartedElasticsearchContainer } from '@testcontainers/elasticsearch'; +import { expect } from 'chai'; + +import { + BulkAction, + type CreateBulkRequest, + type DataRecordNested, + type DeleteBulkRequest, + type ElasticsearchService, +} from '@overture-stack/maestro-common'; + +import { es7 } from '../../../src/client/v7/client.js'; +import { es8 } from '../../../src/client/v8/client.js'; + +export default function suite() { + let container: StartedElasticsearchContainer; + let client: ElasticsearchService; + + before(async () => { + // Start an Elasticsearch container + container = await new ElasticsearchContainer(this.ctx.dockerImage) + .withEnvironment({ + 'xpack.security.enabled': 'false', + }) + .start(); + + // Get the connection details for the running container + const esHost = container.getHttpUrl(); + const clientVersion = this.ctx.clientVersion; + + // Initialize our client wrapper + if (clientVersion === 7) { + client = es7({ nodes: esHost, version: 7, basicAuth: { enabled: false } }); + } else if (clientVersion === 8) { + client = es8({ nodes: esHost, version: 8, basicAuth: { enabled: false } }); + } + + // Wait for Elasticsearch to be ready + await new Promise((resolve) => setTimeout(resolve, 5000)); // Add a delay to ensure Elasticsearch is ready + }); + + after(async () => { + // Stop the container after tests are complete + await container.stop(); + }); + + it('should return successful true when bulk delete data', async () => { + const indexName = 'test-index'; + + // Create Data + const records: DataRecordNested[] = [ + { id: 1, name: 'value1' }, + { id: 2, name: 'value2' }, + { id: 3, name: 'value3' }, + ]; + const bulkRequest: CreateBulkRequest[] = records.map((data) => ({ action: BulkAction.CREATE, dataSet: data })); + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + + // Delete Data + const idsToDelete: string[] = ['1', '2', '3']; + const bulkRequestDelete: DeleteBulkRequest[] = idsToDelete.map((id) => ({ + action: BulkAction.DELETE, + id, + })); + + const resultDelete = await client.bulk(indexName, bulkRequestDelete); + expect(resultDelete.successful).to.eql(true); + expect(resultDelete.indexName).to.eql(indexName); + expect(Object.keys(resultDelete.failureData).length).to.eq(0); + }); + + it('should return successful true when doc records to delete does not exists', async () => { + const indexName = 'test-index'; + + // Delete Data + const doesNotExistIds: string[] = ['1', '2', '3']; + const bulkRequest: DeleteBulkRequest[] = doesNotExistIds.map((id) => ({ + action: BulkAction.DELETE, + id, + })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + }); + + it('should return successful false when a ConnectionError is thrown', async () => { + const indexName = 'test-index'; + + // Setting an invalid node url to throw a Connection Error + if (this.ctx.clientVersion === 7) { + client = es7({ nodes: 'http://unknown', version: 7, basicAuth: { enabled: false } }); + } else if (this.ctx.clientVersion === 8) { + client = es8({ nodes: 'http://unknown', version: 8, basicAuth: { enabled: false } }); + } + + const idsToDelete: string[] = ['1', '2', '3']; + const bulkRequest: DeleteBulkRequest[] = idsToDelete.map((id) => ({ + action: BulkAction.DELETE, + id, + })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(false); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(1); + expect(result.failureData).to.eql({ error: ['ConnectionError'] }); + }); +} diff --git a/packages/indexer-client/test/integration/bulk/update.spec.ts b/packages/indexer-client/test/integration/bulk/update.spec.ts new file mode 100644 index 0000000..7b98dc6 --- /dev/null +++ b/packages/indexer-client/test/integration/bulk/update.spec.ts @@ -0,0 +1,115 @@ +import { ElasticsearchContainer, StartedElasticsearchContainer } from '@testcontainers/elasticsearch'; +import { expect } from 'chai'; + +import { + BulkAction, + type CreateBulkRequest, + type DataRecordNested, + type ElasticsearchService, + type UpdateBulkRequest, +} from '@overture-stack/maestro-common'; + +import { es7 } from '../../../src/client/v7/client.js'; +import { es8 } from '../../../src/client/v8/client.js'; + +export default function suite() { + let container: StartedElasticsearchContainer; + let client: ElasticsearchService; + + before(async () => { + // Start an Elasticsearch container + container = await new ElasticsearchContainer(this.ctx.dockerImage) + .withEnvironment({ + 'xpack.security.enabled': 'false', + }) + .start(); + + // Get the connection details for the running container + const esHost = container.getHttpUrl(); + const clientVersion = this.ctx.clientVersion; + + // Initialize our client wrapper + if (clientVersion === 7) { + client = es7({ nodes: esHost, version: 7, basicAuth: { enabled: false } }); + } else if (clientVersion === 8) { + client = es8({ nodes: esHost, version: 8, basicAuth: { enabled: false } }); + } + + // Wait for Elasticsearch to be ready + await new Promise((resolve) => setTimeout(resolve, 5000)); // Add a delay to ensure Elasticsearch is ready + }); + + after(async () => { + // Stop the container after tests are complete + await container.stop(); + }); + + it('should return successful true when bulk creating data', async () => { + const indexName = 'test-index'; + + // Create Data + const records: DataRecordNested[] = [ + { id: 1, name: 'value1' }, + { id: 2, name: 'value2' }, + { id: 3, name: 'value3' }, + ]; + const bulkRequest: CreateBulkRequest[] = records.map((data) => ({ action: BulkAction.CREATE, dataSet: data })); + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + + // Update Data + const recordsUpdate: DataRecordNested[] = [ + { id: 1, name: 'VALUE1' }, + { id: 2, name: 'VALUE2' }, + { id: 3, name: 'VALUE3' }, + ]; + const bulkRequestUpdate: UpdateBulkRequest[] = recordsUpdate.map((data) => ({ + action: BulkAction.UPDATE, + dataSet: data, + })); + + const resultUpdate = await client.bulk(indexName, bulkRequestUpdate); + expect(resultUpdate.successful).to.eql(true); + expect(resultUpdate.indexName).to.eql(indexName); + expect(Object.keys(resultUpdate.failureData).length).to.eq(0); + }); + + it('should return successful false when doc id does not exists', async () => { + const indexName = 'test-index'; + + // Update Data + const records: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const bulkRequest: UpdateBulkRequest[] = records.map((data) => ({ + action: BulkAction.UPDATE, + dataSet: data, + })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(false); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(1); + expect(result.failureData).to.eql({ error: ['ResponseError'] }); + }); + + it('should return successful false when a ConnectionError is thrown', async () => { + const indexName = 'test-index'; + + // Setting an invalid node url to throw a Connection Error + if (this.ctx.clientVersion === 7) { + client = es7({ nodes: 'http://unknown', version: 7, basicAuth: { enabled: false } }); + } else if (this.ctx.clientVersion === 8) { + client = es8({ nodes: 'http://unknown', version: 8, basicAuth: { enabled: false } }); + } + + const records: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const bulkRequest: UpdateBulkRequest[] = records.map((data) => ({ action: BulkAction.UPDATE, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(false); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(1); + expect(result.failureData).to.eql({ error: ['ConnectionError'] }); + }); +} diff --git a/packages/indexer-client/test/integration/bulk/upsert.spec.ts b/packages/indexer-client/test/integration/bulk/upsert.spec.ts new file mode 100644 index 0000000..b8bd2c3 --- /dev/null +++ b/packages/indexer-client/test/integration/bulk/upsert.spec.ts @@ -0,0 +1,95 @@ +import { ElasticsearchContainer, StartedElasticsearchContainer } from '@testcontainers/elasticsearch'; +import { expect } from 'chai'; + +import { + BulkAction, + type DataRecordNested, + type ElasticsearchService, + type UpsertBulkRequest, +} from '@overture-stack/maestro-common'; + +import { es7 } from '../../../src/client/v7/client.js'; +import { es8 } from '../../../src/client/v8/client.js'; + +export default function suite() { + let container: StartedElasticsearchContainer; + let client: ElasticsearchService; + + before(async () => { + // Start an Elasticsearch container + container = await new ElasticsearchContainer(this.ctx.dockerImage) + .withEnvironment({ + 'xpack.security.enabled': 'false', + }) + .start(); + + // Get the connection details for the running container + const esHost = container.getHttpUrl(); + const clientVersion = this.ctx.clientVersion; + + // Initialize our client wrapper + if (clientVersion === 7) { + client = es7({ nodes: esHost, version: 7, basicAuth: { enabled: false } }); + } else if (clientVersion === 8) { + client = es8({ nodes: esHost, version: 8, basicAuth: { enabled: false } }); + } + + // Wait for Elasticsearch to be ready + await new Promise((resolve) => setTimeout(resolve, 5000)); // Add a delay to ensure Elasticsearch is ready + }); + + after(async () => { + // Stop the container after tests are complete + await container.stop(); + }); + + it('should return successful true when bulk upserting data', async () => { + const indexName = 'test-index'; + + // Upsert Data + const records: DataRecordNested[] = [ + { id: 1, name: 'value1' }, + { id: 2, name: 'value2' }, + { id: 3, name: 'value3' }, + ]; + const bulkRequest: UpsertBulkRequest[] = records.map((data) => ({ action: BulkAction.UPSERT, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + }); + + it('should return successful true when doc id does not exists', async () => { + const indexName = 'test-index'; + + // Upsert Data + const records: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const bulkRequest: UpsertBulkRequest[] = records.map((data) => ({ action: BulkAction.UPSERT, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(true); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(0); + }); + + it('should return successful false when a ConnectionError is thrown', async () => { + const indexName = 'test-index'; + + // Setting an invalid node url to throw a Connection Error + if (this.ctx.clientVersion === 7) { + client = es7({ nodes: 'http://unknown', version: 7, basicAuth: { enabled: false } }); + } else if (this.ctx.clientVersion === 8) { + client = es8({ nodes: 'http://unknown', version: 8, basicAuth: { enabled: false } }); + } + + const records: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const bulkRequest: UpsertBulkRequest[] = records.map((data) => ({ action: BulkAction.UPSERT, dataSet: data })); + + const result = await client.bulk(indexName, bulkRequest); + expect(result.successful).to.eql(false); + expect(result.indexName).to.eql(indexName); + expect(Object.keys(result.failureData).length).to.eq(1); + expect(result.failureData).to.eql({ error: ['ConnectionError'] }); + }); +} diff --git a/packages/indexer-client/test/integration/bulkUpsertIndexedData.spec.ts b/packages/indexer-client/test/integration/deleteData.spec.ts similarity index 69% rename from packages/indexer-client/test/integration/bulkUpsertIndexedData.spec.ts rename to packages/indexer-client/test/integration/deleteData.spec.ts index 498e247..a045dba 100644 --- a/packages/indexer-client/test/integration/bulkUpsertIndexedData.spec.ts +++ b/packages/indexer-client/test/integration/deleteData.spec.ts @@ -1,7 +1,7 @@ import { ElasticsearchContainer, StartedElasticsearchContainer } from '@testcontainers/elasticsearch'; import { expect } from 'chai'; -import type { DataRecordNested, ElasticsearchService } from '@overture-stack/maestro-common'; +import { type DataRecordNested, ElasticsearchService } from '@overture-stack/maestro-common'; import { es7 } from '../../src/client/v7/client.js'; import { es8 } from '../../src/client/v8/client.js'; @@ -38,32 +38,38 @@ export default function suite() { await container.stop(); }); - it('should return successful true when bulk upserting indexed data', async () => { + it('should return successful true when deleting data', async () => { const indexName = 'test-index'; - // Upsert Data - const data: DataRecordNested[] = [ - { id: 1, name: 'value1' }, - { id: 2, name: 'value2' }, - { id: 3, name: 'value3' }, - ]; + // insert data + const mockData: DataRecordNested = { + id: '1234', + data: { key: 'value' }, + entityName: 'test-entity', + organization: 'test-org', + }; + + await client.addData(indexName, mockData); + + // delete data + const id = '1234'; + const result = await client.deleteData(indexName, id); - const result = await client.bulkUpsert(indexName, data); expect(result.successful).to.eql(true); expect(result.indexName).to.eql(indexName); expect(Object.keys(result.failureData).length).to.eq(0); }); - it('should return successful true when doc id does not exists', async () => { + it('should return successful false when deleting id does not exists', async () => { const indexName = 'test-index'; - // Upsert Data - const data: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; + const idDoesNotExists = '890'; - const result = await client.bulkUpsert(indexName, data); - expect(result.successful).to.eql(true); + const result = await client.deleteData(indexName, idDoesNotExists); + expect(result.successful).to.eql(false); expect(result.indexName).to.eql(indexName); - expect(Object.keys(result.failureData).length).to.eq(0); + expect(Object.keys(result.failureData).length).to.eq(1); + expect(result.failureData).to.eql({ '890': ['ResponseError'] }); }); it('should return successful false when a ConnectionError is thrown', async () => { @@ -76,12 +82,11 @@ export default function suite() { client = es8({ nodes: 'http://unknown', version: 8, basicAuth: { enabled: false } }); } - const data: DataRecordNested[] = [{ name: 'value1' }, { name: 'value2' }, { name: 'value3' }]; - - const result = await client.bulkUpsert(indexName, data); + const id = '123'; + const result = await client.deleteData(indexName, id); expect(result.successful).to.eql(false); expect(result.indexName).to.eql(indexName); expect(Object.keys(result.failureData).length).to.eq(1); - expect(result.failureData).to.eql({ error: ['ConnectionError'] }); + expect(result.failureData).to.eql({ '123': ['ConnectionError'] }); }); } diff --git a/packages/indexer-client/test/integration/index.spec.ts b/packages/indexer-client/test/integration/index.spec.ts index 708a5f0..ceb15e8 100644 --- a/packages/indexer-client/test/integration/index.spec.ts +++ b/packages/indexer-client/test/integration/index.spec.ts @@ -1,5 +1,9 @@ -import bulkUpsertDataTest from './bulkUpsertIndexedData.spec'; +import bulkCreateDataTest from './bulk/create.spec'; +import bulkDeleteDataTest from './bulk/delete.spec'; +import bulkUpdateDataTest from './bulk/update.spec'; +import bulkUpsertDataTest from './bulk/upsert.spec'; import createdIndexTest from './createdIndex.spec'; +import deteleDataTest from './deleteData.spec'; import indexDataTest from './indexData.spec'; import pingTest from './ping.spec'; import updateIndexDataTest from './updateIndexedData.spec'; @@ -24,7 +28,19 @@ describe('Integration tests', function () { describe('Update Data', function () { updateIndexDataTest.call(this); }); - describe('Bulk upsert Data', function () { + describe('Delete Data', function () { + deteleDataTest.call(this); + }); + describe('Bulk Create Data', function () { + bulkCreateDataTest.call(this); + }); + describe('Bulk Update Data', function () { + bulkUpdateDataTest.call(this); + }); + describe('Bulk Delete Data', function () { + bulkDeleteDataTest.call(this); + }); + describe('Bulk Upsert Data', function () { bulkUpsertDataTest.call(this); }); }); @@ -48,7 +64,19 @@ describe('Integration tests', function () { describe('Update Data', function () { updateIndexDataTest.call(this); }); - describe('Bulk upsert Data', function () { + describe('Delete Data', function () { + deteleDataTest.call(this); + }); + describe('Bulk Create Data', function () { + bulkCreateDataTest.call(this); + }); + describe('Bulk Update Data', function () { + bulkUpdateDataTest.call(this); + }); + describe('Bulk Delete Data', function () { + bulkDeleteDataTest.call(this); + }); + describe('Bulk Upsert Data', function () { bulkUpsertDataTest.call(this); }); }); diff --git a/packages/maestro-provider/src/api/api.ts b/packages/maestro-provider/src/api/api.ts index 088ae35..5641627 100644 --- a/packages/maestro-provider/src/api/api.ts +++ b/packages/maestro-provider/src/api/api.ts @@ -1,5 +1,6 @@ import { BadRequest, + BulkAction, type ElasticsearchService, type FailureData, type IndexResult, @@ -8,6 +9,7 @@ import { logger, type MaestroProviderConfig, type RepositoryIndexingOperations, + type UpsertBulkRequest, } from '@overture-stack/maestro-common'; import { getRepoInformation, repository } from '@overture-stack/maestro-repository'; @@ -71,7 +73,8 @@ export const api = (config: MaestroProviderConfig, indexer: ElasticsearchService try { for await (const items of repository(repoInfo).getRepositoryRecords()) { - const result = await indexer.bulkUpsert(repoInfo.indexName, items); + const upsertRequest: UpsertBulkRequest[] = items.map((i) => ({ action: BulkAction.UPSERT, dataSet: i })); + const result = await indexer.bulk(repoInfo.indexName, upsertRequest); mergeResult(resultIndex, result); } } catch (error) { @@ -111,7 +114,8 @@ export const api = (config: MaestroProviderConfig, indexer: ElasticsearchService try { for await (const items of repository(repoInfo).getOrganizationRecords({ organization })) { - const result = await indexer.bulkUpsert(repoInfo.indexName, items); + const upsertRequest: UpsertBulkRequest[] = items.map((i) => ({ action: BulkAction.UPSERT, dataSet: i })); + const result = await indexer.bulk(repoInfo.indexName, upsertRequest); mergeResult(resultIndex, result); } } catch (error) {