From 6d28e89e57bba09ecb488528c59c51364455f2a6 Mon Sep 17 00:00:00 2001 From: "Andre Wendel (Q290938)" Date: Thu, 22 Aug 2024 13:14:02 +0200 Subject: [PATCH] Support dots separation for endpoint naming in realm db Signed-off-by: Andre Wendel --- .gitignore | 1 + cdsp/information-layer/handlers/handler.js | 40 ++++-- .../handlers/iotdb/src/iotdb-handler.js | 117 ++++++++++-------- .../iotdb/utils/IoTDBDataInterpreter.js | 34 +---- .../handlers/iotdb/utils/IoTDBRpcDataSets.js | 2 +- .../handlers/realmdb/src/realmdb-handler.js | 76 +++++------- cdsp/information-layer/router/src/.env | 2 +- 7 files changed, 135 insertions(+), 137 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/cdsp/information-layer/handlers/handler.js b/cdsp/information-layer/handlers/handler.js index 7ea3025..41b9a19 100644 --- a/cdsp/information-layer/handlers/handler.js +++ b/cdsp/information-layer/handlers/handler.js @@ -56,16 +56,40 @@ class Handler { * @returns {Object} - The transformed message. */ createOrUpdateMessage(message, nodes, type) { - const transformedNodes = nodes.map((node) => ({ - name: node.name, - value: node.value, - })); - return { - ...message, - nodes: transformedNodes, + const { id, tree, uuid } = message; + let newMessage = { type, - timestamp: new Date().toISOString(), + tree, + id, + dateTime: new Date().toISOString(), + uuid, }; + if (nodes.length === 1) { + newMessage["node"] = nodes[0]; + } else { + newMessage["nodes"] = nodes; + } + return newMessage; + } + + /** + * Transforms a message node by replacing all dots with underscores. + * + * @param {string} node - The message node to transform. + * @returns {string} - The transformed message node with dots replaced by underscores. + */ + transformEndpointFromMessageNode(node) { + return `${node}`.replace(/\./g, "_"); + } + + /** + * Transforms a database field name by replacing underscores with all dots. + * + * @param {string} field - The database filed to transform. + * @returns {string} - The transformed to message node replacing underscores by dots. + */ + transformEndpointFromDatabaseFields(field) { + return `${field}`.replace(/\_/g, "."); } } diff --git a/cdsp/information-layer/handlers/iotdb/src/iotdb-handler.js b/cdsp/information-layer/handlers/iotdb/src/iotdb-handler.js index ed23766..d4c06b3 100644 --- a/cdsp/information-layer/handlers/iotdb/src/iotdb-handler.js +++ b/cdsp/information-layer/handlers/iotdb/src/iotdb-handler.js @@ -14,28 +14,6 @@ const { IoTDBDataInterpreter } = require("../utils/IoTDBDataInterpreter"); const endpointsType = require("../config/endpointsType"); const database = require("../config/databaseParams"); -/** - * Creates an object to insert data in IoTDB based on the provided message. - * - * @param {Object} message - The message containing data to be inserted. - * @returns {Object} The data object to be inserted. - */ -function createObjectToInsert(message) { - const { id, tree } = message; - const data = { [database[tree].endpointId]: id }; - if (message.node) { - data[ - IoTDBDataInterpreter.transformEndpointFromMessageNode(message.node.name) - ] = message.node.value; - } else if (message.nodes) { - message.nodes.forEach((node) => { - data[IoTDBDataInterpreter.transformEndpointFromMessageNode(node.name)] = - node.value; - }); - } - return data; -} - class IoTDBHandler extends Handler { constructor() { super(); @@ -88,17 +66,11 @@ class IoTDBHandler extends Handler { this.sendMessageToClient(ws, responseMessage); } else { console.log("Object not found."); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Object not found." }) - ); + this.sendMessageToClient(ws, { error: "Object not found." }); } } catch (error) { console.error("Failed to read data from IoTDB: ", error); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Error reading object" }) - ); + this.sendMessageToClient(ws, { error: "Error reading object" }); } finally { this.#closeSessionIfNeeded(); } @@ -107,7 +79,7 @@ class IoTDBHandler extends Handler { async write(message, ws) { try { await this.#openSessionIfNeeded(); - const data = createObjectToInsert(message); + const data = this.#createObjectToInsert(message); const errorUndefinedTypes = []; let measurements = []; let dataTypes = []; @@ -120,7 +92,6 @@ class IoTDBHandler extends Handler { values.push(value); } else { errorUndefinedTypes.push(`The endpoint "${key}" is not supported`); - continue; } } @@ -161,17 +132,11 @@ class IoTDBHandler extends Handler { this.sendMessageToClients(responseMessage); } else { console.log("Object not found."); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Object not found." }) - ); + this.sendMessageToClient(ws, { error: "Object not found." }); } } catch (error) { console.error(`Failed writing data to IoTDB. ${error}`); - this.sendMessageToClient( - ws, - JSON.stringify({ error: `Failed writing data. ${error}` }) - ); + this.sendMessageToClient(ws, { error: `Failed writing data. ${error}` }); } finally { this.#closeSessionIfNeeded(); } @@ -197,7 +162,7 @@ class IoTDBHandler extends Handler { try { const resp = await this.client.openSession(openSessionReq); - if (this.protocolVersion != resp.serverProtocolVersion) { + if (this.protocolVersion !== resp.serverProtocolVersion) { console.log( "Protocol differ, Client version is " + this.protocolVersion + @@ -205,7 +170,7 @@ class IoTDBHandler extends Handler { resp.serverProtocolVersion ); // version is less than 0.10 - if (resp.serverProtocolVersion == 0) { + if (resp.serverProtocolVersion === 0) { throw new Error("Protocol not supported."); } } @@ -320,8 +285,8 @@ class IoTDBHandler extends Handler { isAligned = false ) { if ( - values.length != dataTypes.length || - values.length != measurements.length + values.length !== dataTypes.length || + values.length !== measurements.length ) { throw "length of data types does not equal to length of values!"; } @@ -351,15 +316,14 @@ class IoTDBHandler extends Handler { async #queryLastFields(message, ws) { const { id: objectId, tree } = message; const { databaseName, endpointId } = database[tree]; - const fieldsToSearch = - IoTDBDataInterpreter.extractEndpointsFromNodes(message).join(", "); + const fieldsToSearch = this.#extractEndpointsFromNodes(message).join(", "); const sql = `SELECT ${fieldsToSearch} FROM ${databaseName} WHERE ${endpointId} = '${objectId}' ORDER BY Time ASC`; try { const sessionDataSet = await this.#executeQueryStatement(sql); if (!sessionDataSet || Object.keys(sessionDataSet).length === 0) { - throw new Error({ error: "Internal error constructing read object." }); + throw new Error("Internal error constructing read object."); } const mediaElements = []; @@ -368,13 +332,21 @@ class IoTDBHandler extends Handler { } let latestValues = {}; - mediaElements.forEach((mediaElement) => { + // extract underscores from media element key + const transformedMediaElement = Object.fromEntries( + Object.entries(mediaElement).map(([key, value]) => { + const newKey = this.transformEndpointFromDatabaseFields(key); + return [newKey, value]; + }) + ); + const transformedObject = IoTDBDataInterpreter.extractNodesFromTimeseries( - mediaElement, + transformedMediaElement, databaseName ); + Object.entries(transformedObject).forEach(([key, value]) => { if (value !== null) { latestValues[key] = value; @@ -388,11 +360,52 @@ class IoTDBHandler extends Handler { })); } catch (error) { console.error(error); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Internal error constructing read object." }) + this.sendMessageToClient(ws, { + error: "Internal error constructing read object.", + }); + } + } + + /** + * Extracts endpoint names from the given message. + * + * This function checks if the message has a single node or multiple nodes and + * extracts the names accordingly. + * + * @param {Object} message - The message containing node(s). + * @returns {Array} An array of endpoint names. + */ + #extractEndpointsFromNodes(message) { + let endpoints = []; + + if (message.node) { + endpoints.push(this.transformEndpointFromMessageNode(message.node.name)); + } else if (message.nodes) { + endpoints = message.nodes.map((node) => + this.transformEndpointFromMessageNode(node.name) ); } + return endpoints; + } + + /** + * Creates an object to insert data in IoTDB based on the provided message. + * + * @param {Object} message - The message containing data to be inserted. + * @returns {Object} The data object to be inserted. + */ + #createObjectToInsert(message) { + const { id, tree } = message; + const data = { [database[tree].endpointId]: id }; + if (message.node) { + data[this.transformEndpointFromMessageNode(message.node.name)] = + message.node.value; + } else if (message.nodes) { + message.nodes.forEach((node) => { + data[this.transformEndpointFromMessageNode(node.name)] = node.value; + }); + } + return data; } } diff --git a/cdsp/information-layer/handlers/iotdb/utils/IoTDBDataInterpreter.js b/cdsp/information-layer/handlers/iotdb/utils/IoTDBDataInterpreter.js index e665885..2b06fc4 100644 --- a/cdsp/information-layer/handlers/iotdb/utils/IoTDBDataInterpreter.js +++ b/cdsp/information-layer/handlers/iotdb/utils/IoTDBDataInterpreter.js @@ -88,7 +88,7 @@ class IoTDBDataInterpreter { static extractNodesFromTimeseries(obj, databaseName) { return Object.entries(obj).reduce((acc, [key, value]) => { if (key.startsWith(databaseName)) { - const newKey = key.replace(`${databaseName}.`, "").replace(/_/g, "."); + const newKey = key.replace(`${databaseName}.`, ""); acc[newKey] = value; } return acc; @@ -108,38 +108,6 @@ class IoTDBDataInterpreter { return { timestamp }; } - - /** - * Extracts endpoint names from the given message. - * - * This function checks if the message has a single node or multiple nodes and - * extracts the names accordingly. - * - * @param {Object} message - The message containing node(s). - * @returns {Array} An array of endpoint names. - */ - static extractEndpointsFromNodes(message) { - let endpoints = []; - - if (message.node) { - endpoints.push(this.transformEndpointFromMessageNode(message.node.name)); - } else if (message.nodes) { - endpoints = message.nodes.map((node) => - this.transformEndpointFromMessageNode(node.name) - ); - } - return endpoints; - } - - /** - * Transforms a message node by replacing all dots with underscores. - * - * @param {string} node - The message node to transform. - * @returns {string} - The transformed message node with dots replaced by underscores. - */ - static transformEndpointFromMessageNode(node) { - return `${node}`.replace(/\./g, "_"); - } } module.exports = { IoTDBDataInterpreter }; diff --git a/cdsp/information-layer/handlers/iotdb/utils/IoTDBRpcDataSets.js b/cdsp/information-layer/handlers/iotdb/utils/IoTDBRpcDataSets.js index 8eda7a3..62bff90 100644 --- a/cdsp/information-layer/handlers/iotdb/utils/IoTDBRpcDataSets.js +++ b/cdsp/information-layer/handlers/iotdb/utils/IoTDBRpcDataSets.js @@ -188,7 +188,7 @@ class IoTDBRpcDataSet { * @returns {boolean} True if there is a cached result, otherwise false. */ #hasCachedResult() { - return this.#queryDataSet !== null && this.#queryDataSet.time.length != 0; + return this.#queryDataSet !== null && this.#queryDataSet.time.length !== 0; } /** diff --git a/cdsp/information-layer/handlers/realmdb/src/realmdb-handler.js b/cdsp/information-layer/handlers/realmdb/src/realmdb-handler.js index ba03fc6..db7038c 100644 --- a/cdsp/information-layer/handlers/realmdb/src/realmdb-handler.js +++ b/cdsp/information-layer/handlers/realmdb/src/realmdb-handler.js @@ -8,26 +8,6 @@ const { v4: uuidv4 } = require("uuid"); const app = new Realm.App({ id: config.realmAppId }); const credentials = Realm.Credentials.apiKey(config.realmApiKey); -/** - * Parses the response from a read event. - * - * @param {Object} message - The message object containing node or nodes information. - * @param {Object} queryResponseObj - The query response object containing values to be mapped. - * @returns {Object} - A data object with keys from the message nodes and values from the query response. - */ -function parseReadResponse(message, queryResponseObj) { - const data = []; - const nodes = message.node ? [message.node] : message.nodes; - nodes.forEach((node) => { - const prop = node.name; - data.push({ - name: prop, - value: queryResponseObj[prop], - }); - }); - return data; -} - /** * Parses the response from a media element change event. * @@ -77,13 +57,10 @@ class RealmDBHandler extends Handler { try { const updateMessage = await this.#getMessageData(message, ws); console.log(updateMessage); - this.sendMessageToClient(ws, JSON.stringify(updateMessage)); + this.sendMessageToClient(ws, updateMessage); } catch (error) { console.error("Error reading object from Realm:", error); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Error reading object" }) - ); + this.sendMessageToClient(ws, { error: "Error reading object" }); } } @@ -96,8 +73,9 @@ class RealmDBHandler extends Handler { if (mediaElement) { this.realm.write(() => { nodes.forEach(({ name, value }) => { + const prop = this.transformEndpointFromMessageNode(name); endpoints.push(name); - mediaElement[name] = value; + mediaElement[prop] = value; }); }); } else { @@ -105,8 +83,9 @@ class RealmDBHandler extends Handler { let document = { _id: uuidv4(), [endpointId]: message.id }; nodes.forEach(({ name, value }) => { + const prop = this.transformEndpointFromMessageNode(name); endpoints.push(name); - document[name] = value; + document[prop] = value; }); const databaseName = database[message.tree].databaseName; @@ -123,7 +102,7 @@ class RealmDBHandler extends Handler { ); const updateMessage = await this.#getMessageData(readMessage, ws); console.log(updateMessage); - this.sendMessageToClients(JSON.stringify(updateMessage)); + this.sendMessageToClients(updateMessage); } catch (error) { console.error("Error writing object changes in Realm:", error); this.sendMessageToClient(ws, { error: "Error writing object changes" }); @@ -161,17 +140,13 @@ class RealmDBHandler extends Handler { }); } else { console.log(`Object could not be subscribed`); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Object could not be subscribed" }) - ); + this.sendMessageToClient(ws, { + error: "Object could not be subscribed", + }); } } else { console.log(`Object not found`); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Object not found" }) - ); + this.sendMessageToClient(ws, { error: "Object not found" }); } } catch (error) { console.error("Error subscribing to object changes in Realm:", error); @@ -193,7 +168,7 @@ class RealmDBHandler extends Handler { const mediaElement = await this.#getMediaElement(message, ws); console.log("mediaElement: ", mediaElement); if (mediaElement) { - const responseNodes = parseReadResponse(message, mediaElement); + const responseNodes = this.#parseReadResponse(message, mediaElement); return this.createOrUpdateMessage(message, responseNodes, "update"); } else { throw new Error(`No data found with the Id: ${message.id}`); @@ -216,10 +191,7 @@ class RealmDBHandler extends Handler { .filtered(`${endpointId} = '${id}'`)[0]; } catch (error) { console.error("Error reading object from Realm:", error); - this.sendMessageToClient( - ws, - JSON.stringify({ error: "Error reading object" }) - ); + this.sendMessageToClient(ws, { error: "Error reading object" }); } } @@ -244,10 +216,30 @@ class RealmDBHandler extends Handler { "update" ); console.log(updateMessage); - this.sendMessageToClients(JSON.stringify(updateMessage)); + this.sendMessageToClients(updateMessage); } } } + + /** + * Parses the response from a read event. + * + * @param {Object} message - The message object containing node or nodes information. + * @param {Object} queryResponseObj - The query response object containing values to be mapped. + * @returns {Object} - A data object with keys from the message nodes and values from the query response. + */ + #parseReadResponse(message, queryResponseObj) { + const data = []; + const nodes = message.node ? [message.node] : message.nodes; + nodes.forEach((node) => { + const prop = this.transformEndpointFromMessageNode(node.name); + data.push({ + name: node.name, + value: queryResponseObj[prop], + }); + }); + return data; + } } module.exports = RealmDBHandler; diff --git a/cdsp/information-layer/router/src/.env b/cdsp/information-layer/router/src/.env index 16ac659..bcaf5de 100644 --- a/cdsp/information-layer/router/src/.env +++ b/cdsp/information-layer/router/src/.env @@ -1 +1 @@ -HANDLER_TYPE=realmdb \ No newline at end of file +HANDLER_TYPE=iotdb \ No newline at end of file