Skip to content

Commit

Permalink
Support dots separation for endpoint naming in realm db
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Wendel <[email protected]>
  • Loading branch information
aw-muc committed Aug 22, 2024
1 parent 56a1109 commit 6d28e89
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 137 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
40 changes: 32 additions & 8 deletions cdsp/information-layer/handlers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,40 @@ class Handler {
* @returns {Object} - The transformed message.
*/
createOrUpdateMessage(message, nodes, type) {
const transformedNodes = nodes.map((node) => ({
name: node.name,
value: node.value,
}));
return {
...message,
nodes: transformedNodes,
const { id, tree, uuid } = message;
let newMessage = {
type,
timestamp: new Date().toISOString(),
tree,
id,
dateTime: new Date().toISOString(),
uuid,
};
if (nodes.length === 1) {
newMessage["node"] = nodes[0];
} else {
newMessage["nodes"] = nodes;
}
return newMessage;
}

/**
* Transforms a message node by replacing all dots with underscores.
*
* @param {string} node - The message node to transform.
* @returns {string} - The transformed message node with dots replaced by underscores.
*/
transformEndpointFromMessageNode(node) {
return `${node}`.replace(/\./g, "_");
}

/**
* Transforms a database field name by replacing underscores with all dots.
*
* @param {string} field - The database filed to transform.
* @returns {string} - The transformed to message node replacing underscores by dots.
*/
transformEndpointFromDatabaseFields(field) {
return `${field}`.replace(/\_/g, ".");
}
}

Expand Down
117 changes: 65 additions & 52 deletions cdsp/information-layer/handlers/iotdb/src/iotdb-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,6 @@ const { IoTDBDataInterpreter } = require("../utils/IoTDBDataInterpreter");
const endpointsType = require("../config/endpointsType");
const database = require("../config/databaseParams");

/**
* Creates an object to insert data in IoTDB based on the provided message.
*
* @param {Object} message - The message containing data to be inserted.
* @returns {Object} The data object to be inserted.
*/
function createObjectToInsert(message) {
const { id, tree } = message;
const data = { [database[tree].endpointId]: id };
if (message.node) {
data[
IoTDBDataInterpreter.transformEndpointFromMessageNode(message.node.name)
] = message.node.value;
} else if (message.nodes) {
message.nodes.forEach((node) => {
data[IoTDBDataInterpreter.transformEndpointFromMessageNode(node.name)] =
node.value;
});
}
return data;
}

class IoTDBHandler extends Handler {
constructor() {
super();
Expand Down Expand Up @@ -88,17 +66,11 @@ class IoTDBHandler extends Handler {
this.sendMessageToClient(ws, responseMessage);
} else {
console.log("Object not found.");
this.sendMessageToClient(
ws,
JSON.stringify({ error: "Object not found." })
);
this.sendMessageToClient(ws, { error: "Object not found." });
}
} catch (error) {
console.error("Failed to read data from IoTDB: ", error);
this.sendMessageToClient(
ws,
JSON.stringify({ error: "Error reading object" })
);
this.sendMessageToClient(ws, { error: "Error reading object" });
} finally {
this.#closeSessionIfNeeded();
}
Expand All @@ -107,7 +79,7 @@ class IoTDBHandler extends Handler {
async write(message, ws) {
try {
await this.#openSessionIfNeeded();
const data = createObjectToInsert(message);
const data = this.#createObjectToInsert(message);
const errorUndefinedTypes = [];
let measurements = [];
let dataTypes = [];
Expand All @@ -120,7 +92,6 @@ class IoTDBHandler extends Handler {
values.push(value);
} else {
errorUndefinedTypes.push(`The endpoint "${key}" is not supported`);
continue;
}
}

Expand Down Expand Up @@ -161,17 +132,11 @@ class IoTDBHandler extends Handler {
this.sendMessageToClients(responseMessage);
} else {
console.log("Object not found.");
this.sendMessageToClient(
ws,
JSON.stringify({ error: "Object not found." })
);
this.sendMessageToClient(ws, { error: "Object not found." });
}
} catch (error) {
console.error(`Failed writing data to IoTDB. ${error}`);
this.sendMessageToClient(
ws,
JSON.stringify({ error: `Failed writing data. ${error}` })
);
this.sendMessageToClient(ws, { error: `Failed writing data. ${error}` });
} finally {
this.#closeSessionIfNeeded();
}
Expand All @@ -197,15 +162,15 @@ class IoTDBHandler extends Handler {
try {
const resp = await this.client.openSession(openSessionReq);

if (this.protocolVersion != resp.serverProtocolVersion) {
if (this.protocolVersion !== resp.serverProtocolVersion) {
console.log(
"Protocol differ, Client version is " +
this.protocolVersion +
", but Server version is " +
resp.serverProtocolVersion
);
// version is less than 0.10
if (resp.serverProtocolVersion == 0) {
if (resp.serverProtocolVersion === 0) {
throw new Error("Protocol not supported.");
}
}
Expand Down Expand Up @@ -320,8 +285,8 @@ class IoTDBHandler extends Handler {
isAligned = false
) {
if (
values.length != dataTypes.length ||
values.length != measurements.length
values.length !== dataTypes.length ||
values.length !== measurements.length
) {
throw "length of data types does not equal to length of values!";
}
Expand Down Expand Up @@ -351,15 +316,14 @@ class IoTDBHandler extends Handler {
async #queryLastFields(message, ws) {
const { id: objectId, tree } = message;
const { databaseName, endpointId } = database[tree];
const fieldsToSearch =
IoTDBDataInterpreter.extractEndpointsFromNodes(message).join(", ");
const fieldsToSearch = this.#extractEndpointsFromNodes(message).join(", ");
const sql = `SELECT ${fieldsToSearch} FROM ${databaseName} WHERE ${endpointId} = '${objectId}' ORDER BY Time ASC`;

try {
const sessionDataSet = await this.#executeQueryStatement(sql);

if (!sessionDataSet || Object.keys(sessionDataSet).length === 0) {
throw new Error({ error: "Internal error constructing read object." });
throw new Error("Internal error constructing read object.");
}

const mediaElements = [];
Expand All @@ -368,13 +332,21 @@ class IoTDBHandler extends Handler {
}

let latestValues = {};

mediaElements.forEach((mediaElement) => {
// extract underscores from media element key
const transformedMediaElement = Object.fromEntries(
Object.entries(mediaElement).map(([key, value]) => {
const newKey = this.transformEndpointFromDatabaseFields(key);
return [newKey, value];
})
);

const transformedObject =
IoTDBDataInterpreter.extractNodesFromTimeseries(
mediaElement,
transformedMediaElement,
databaseName
);

Object.entries(transformedObject).forEach(([key, value]) => {
if (value !== null) {
latestValues[key] = value;
Expand All @@ -388,11 +360,52 @@ class IoTDBHandler extends Handler {
}));
} catch (error) {
console.error(error);
this.sendMessageToClient(
ws,
JSON.stringify({ error: "Internal error constructing read object." })
this.sendMessageToClient(ws, {
error: "Internal error constructing read object.",
});
}
}

/**
* Extracts endpoint names from the given message.
*
* This function checks if the message has a single node or multiple nodes and
* extracts the names accordingly.
*
* @param {Object} message - The message containing node(s).
* @returns {Array<string>} An array of endpoint names.
*/
#extractEndpointsFromNodes(message) {
let endpoints = [];

if (message.node) {
endpoints.push(this.transformEndpointFromMessageNode(message.node.name));
} else if (message.nodes) {
endpoints = message.nodes.map((node) =>
this.transformEndpointFromMessageNode(node.name)
);
}
return endpoints;
}

/**
* Creates an object to insert data in IoTDB based on the provided message.
*
* @param {Object} message - The message containing data to be inserted.
* @returns {Object} The data object to be inserted.
*/
#createObjectToInsert(message) {
const { id, tree } = message;
const data = { [database[tree].endpointId]: id };
if (message.node) {
data[this.transformEndpointFromMessageNode(message.node.name)] =
message.node.value;
} else if (message.nodes) {
message.nodes.forEach((node) => {
data[this.transformEndpointFromMessageNode(node.name)] = node.value;
});
}
return data;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class IoTDBDataInterpreter {
static extractNodesFromTimeseries(obj, databaseName) {
return Object.entries(obj).reduce((acc, [key, value]) => {
if (key.startsWith(databaseName)) {
const newKey = key.replace(`${databaseName}.`, "").replace(/_/g, ".");
const newKey = key.replace(`${databaseName}.`, "");
acc[newKey] = value;
}
return acc;
Expand All @@ -108,38 +108,6 @@ class IoTDBDataInterpreter {

return { timestamp };
}

/**
* Extracts endpoint names from the given message.
*
* This function checks if the message has a single node or multiple nodes and
* extracts the names accordingly.
*
* @param {Object} message - The message containing node(s).
* @returns {Array<string>} An array of endpoint names.
*/
static extractEndpointsFromNodes(message) {
let endpoints = [];

if (message.node) {
endpoints.push(this.transformEndpointFromMessageNode(message.node.name));
} else if (message.nodes) {
endpoints = message.nodes.map((node) =>
this.transformEndpointFromMessageNode(node.name)
);
}
return endpoints;
}

/**
* Transforms a message node by replacing all dots with underscores.
*
* @param {string} node - The message node to transform.
* @returns {string} - The transformed message node with dots replaced by underscores.
*/
static transformEndpointFromMessageNode(node) {
return `${node}`.replace(/\./g, "_");
}
}

module.exports = { IoTDBDataInterpreter };
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class IoTDBRpcDataSet {
* @returns {boolean} True if there is a cached result, otherwise false.
*/
#hasCachedResult() {
return this.#queryDataSet !== null && this.#queryDataSet.time.length != 0;
return this.#queryDataSet !== null && this.#queryDataSet.time.length !== 0;
}

/**
Expand Down
Loading

0 comments on commit 6d28e89

Please sign in to comment.