Skip to content

Commit

Permalink
Implemented aggregate of type "count"
Browse files Browse the repository at this point in the history
  • Loading branch information
GermanBluefox committed Sep 19, 2023
1 parent e04931c commit 0a3a40a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 50 deletions.
59 changes: 43 additions & 16 deletions lib/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function initAggregate(options) {
err.message += `: ${options.maxIndex + 2}`;
throw err;
}
// We define the array length but do not prefill values, we do that on runtime when needed
// We define the array length but do not prefill values, we do that on runtime when needed
options.result[0] = {
val: {ts: null, val: null},
max: {ts: null, val: null},
Expand Down Expand Up @@ -108,15 +108,15 @@ function aggregation(options, data) {
// store all border values
if (preIndex < 0) {
index = 0;
// if the ts is even earlier than the "pre interval" ignore it, else we collect all data there
// if the ts is even earlier than the "pre-interval" ignore it, else we collect all data there
if (preIndex < -1) {
collectedTooEarlyData.push(data[i]);
continue;
}
preIndexValueFound = true;
} else if (preIndex > options.maxIndex) {
index = options.maxIndex + 2;
// if the ts is even later than the "post interval" ignore it, else we collect all data there
// if the ts is even later than the "post-interval" ignore it, else we collect all data there
if (preIndex > options.maxIndex + 1) {
collectedTooLateData.push(data[i]);
continue;
Expand All @@ -127,16 +127,16 @@ function aggregation(options, data) {
}
options.overallLength++;

if (options.result[index] === undefined) { // lazy initialization of datastructure
if (options.result[index] === undefined) { // lazy initialization of data structure
options.result[index] = {
val: {ts: null, val: null},
max: {ts: null, val: null},
min: {ts: null, val: null},
start: {ts: null, val: null},
end: {ts: null, val: null}
end: {ts: null, val: null},
};

if (options.aggregate === 'average') {
if (options.aggregate === 'average' || options.aggregate === 'count') {
options.averageCount[index] = 0;
}

Expand All @@ -151,20 +151,20 @@ function aggregation(options, data) {
aggregationLogic(data[i], index, options);
}

// If no data was found in the pre interval, but we have earlier data, we put the latest of them in the pre interval
// If no data was found in the pre-interval, but we have earlier data, we put the latest of them in the pre interval
if (!preIndexValueFound && collectedTooEarlyData.length > 0) {
collectedTooEarlyData = collectedTooEarlyData.sort(sortByTs);
options.overallLength++;
aggregationLogic(collectedTooEarlyData[collectedTooEarlyData.length - 1], 0, options);
}
// If no data was found in the post interval, but we have later data, we put the earliest of them in the pre interval
// If no data was found in the post-interval, but we have later data, we put the earliest of them in the pre interval
if (!postIndexValueFound && collectedTooLateData.length > 0) {
collectedTooLateData = collectedTooLateData.sort(sortByTs);
options.overallLength++;
aggregationLogic(collectedTooLateData[0], options.maxIndex + 2, options);
}

return {result: options.result, step: options.step, sourceLength: data.length} ;
return { result: options.result, step: options.step, sourceLength: data.length };
}

function aggregationLogic(data, index, options) {
Expand Down Expand Up @@ -193,6 +193,8 @@ function aggregationLogic(data, index, options) {
} else if (options.aggregate === 'average') {
options.result[index].val.val += parseFloat(data.val);
options.averageCount[index]++;
} else if (options.aggregate === 'count') {
options.averageCount[index]++;
} else if (options.aggregate === 'total') {
options.result[index].val.val += parseFloat(data.val);
} else if (options.aggregate === 'minmax') {
Expand Down Expand Up @@ -260,7 +262,7 @@ function finishAggregation(options) {
} else if (ii === originalResultLength - 1) {
postBorderValueRemoved = true;
}
//options.result.splice(ii, 1);
// options.result.splice(ii, 1);
continue;
}
// just one value in this period: max == min == start == end
Expand Down Expand Up @@ -427,8 +429,33 @@ function finishAggregation(options) {
});
} else {
// no one value in this interval
//options.result.splice(k, 1);
//options.averageCount.splice(k, 1); // not needed to clean up because not used anymore afterwards
// options.result.splice(k, 1);
// options.averageCount.splice(k, 1); // not needed to clean up because not used anymore afterwards
}
}
options.result = finalResult;
} else if (options.aggregate === 'count') {
let startIndex = 0;
let endIndex = options.result.length;
const finalResult = [];
if (options.removeBorderValues) { // we cut out the additional results
// options.result.splice(0, 1);
// options.averageCount.splice(0, 1);
// options.result.length--;
// options.averageCount.length--;
startIndex++;
endIndex--;
}
for (let k = startIndex; k < endIndex; k++) {
if (options.result[k] !== undefined && options.result[k].val.ts) {
finalResult.push({
ts: options.result[k].val.ts,
val: options.averageCount[k],
});
} else {
// no one value in this interval
// options.result.splice(k, 1);
// options.averageCount.splice(k, 1); // not needed to clean up because not used anymore afterward
}
}
options.result = finalResult;
Expand All @@ -442,7 +469,7 @@ function finishAggregation(options) {
let indexStartTs = options.start + ((k - 1) * options.step);
let indexEndTs = indexStartTs + options.step;
if (options.integralDatapoints[k] && options.integralDatapoints[k].length) {
// Sort datapoints by ts first
// Sort data points by ts first
options.integralDatapoints[k].sort(sortByTs);
}
// Make sure that we have entries that always start at the beginning of the interval
Expand Down Expand Up @@ -579,8 +606,8 @@ function finishAggregation(options) {
finalResult.push(point);
} else {
// no one value in this interval
//options.result.splice(k, 1);
//options.quantileDatapoints.splice(k, 1); // not needed to clean up because not used anymore afterwards
// options.result.splice(k, 1);
// options.quantileDatapoints.splice(k, 1); // not needed to clean up because not used anymore afterward
}
}
options.result = finalResult;
Expand Down Expand Up @@ -714,7 +741,7 @@ function beautify(options) {
if (postLastValue) {
// if steps
if (options.aggregate === 'onchange' || !options.aggregate) {
// if more data following, draw line to the end of chart
// if more data following, draw line to the end of the chart
if (postLastValue.ts !== lastTS) {
options.result.push({ts: options.end, val: postLastValue.val});
} else {
Expand Down
6 changes: 3 additions & 3 deletions lib/mysql.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ exports.getHistory = function (dbname, db, options) {
subWhere += ` \`${dbname}\`.${db}.id=${options.id}`;
}
if (options.ignoreNull) {
//subWhere += (subWhere ? " AND" : "") + " val <> NULL";
// subWhere += (subWhere ? " AND" : "") + " val <> NULL";
}
subWhere += `${subWhere ? ' AND' : ''} \`${dbname}\`.${db}.ts < ${options.start}`;
if (subWhere) {
Expand All @@ -173,7 +173,7 @@ exports.getHistory = function (dbname, db, options) {
subQuery += ` ORDER BY \`${dbname}\`.${db}.ts DESC LIMIT 1`;
where += ` UNION ALL (${subQuery})`;

//add next value after end
// add next value after end
subQuery = " SELECT ts, val" +
(!options.id ? `, ${db}.id as id` : '') +
(options.ack ? ', ack' : '') +
Expand All @@ -187,7 +187,7 @@ exports.getHistory = function (dbname, db, options) {
subWhere += ` \`${dbname}\`.${db}.id=${options.id}`;
}
if (options.ignoreNull) {
//subWhere += (subWhere ? " AND" : "") + " val <> NULL";
// subWhere += (subWhere ? " AND" : "") + " val <> NULL";
}
subWhere += `${subWhere ? ' AND' : ''} \`${dbname}\`.${db}.ts >= ${options.end}`;
if (subWhere) {
Expand Down
49 changes: 18 additions & 31 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -1040,44 +1040,31 @@ function processMessage(msg) {
} else
if (msg.command === 'getHistory') {
getHistory(msg);
}
else if (msg.command === 'getCounter') {
} else if (msg.command === 'getCounter') {
getCounterDiff(msg);
}
else if (msg.command === 'test') {
} else if (msg.command === 'test') {
testConnection(msg);
}
else if (msg.command === 'destroy') {
} else if (msg.command === 'destroy') {
destroyDB(msg);
}
else if (msg.command === 'query') {
} else if (msg.command === 'query') {
query(msg);
}
else if (msg.command === 'update') {
} else if (msg.command === 'update') {
updateState(msg);
}
else if (msg.command === 'delete') {
} else if (msg.command === 'delete') {
deleteState(msg);
}
else if (msg.command === 'deleteAll') {
} else if (msg.command === 'deleteAll') {
deleteStateAll(msg);
}
else if (msg.command === 'deleteRange') {
} else if (msg.command === 'deleteRange') {
deleteState(msg);
}
else if (msg.command === 'storeState') {
} else if (msg.command === 'storeState') {
storeState(msg);
}
else if (msg.command === 'getDpOverview') {
} else if (msg.command === 'getDpOverview') {
getDpOverview(msg);
}
else if (msg.command === 'enableHistory') {
} else if (msg.command === 'enableHistory') {
enableHistory(msg);
}
else if (msg.command === 'disableHistory') {
} else if (msg.command === 'disableHistory') {
disableHistory(msg);
}
else if (msg.command === 'getEnabledDPs') {
} else if (msg.command === 'getEnabledDPs') {
getEnabledDPs(msg);
} else if (msg.command === 'stopInstance') {
finish(() => {
Expand All @@ -1101,7 +1088,7 @@ function processStartValues(callback) {
lc: state ? now - 4 : now, // 4 is because of MS SQL
ack: true,
q: 0x40,
from: `system.adapter.${adapter.namespace}`
from: `system.adapter.${adapter.namespace}`,
});

if (state) {
Expand Down Expand Up @@ -1259,7 +1246,7 @@ function pushHistory(id, state, timerRelog) {
if (sqlDPs[id].type !== types.number) {
adapter.log.error('Counter must have type "number"!');
} else if (state.val < sqlDPs[id].state.val) {
// if actual value is less then last seen counter, store both values
// if the actual value is less then last seen counter, store both values
pushValueIntoDB(id, sqlDPs[id].state, true);
pushValueIntoDB(id, state, true);
}
Expand Down Expand Up @@ -1290,7 +1277,7 @@ function pushHistory(id, state, timerRelog) {
}

if (settings.debounceTime && !ignoreDebonce && !timerRelog) {
// Discard changes in de-bounce time to store last stable value
// Discard changes in the debounce time to store last stable value
sqlDPs[id].timeout && clearTimeout(sqlDPs[id].timeout);
sqlDPs[id].timeout = setTimeout((id, state) => {
if (!sqlDPs[id]) return;
Expand Down Expand Up @@ -2361,7 +2348,7 @@ function getHistory(msg) {
}
} catch (err) {
return adapter.sendTo(msg.from, msg.command, {
error: 'Invalid call. Start date ' + JSON.stringify(options.start) + ' is not a valid date'
error: `Invalid call. Start date ${JSON.stringify(options.start)} is not a valid date`
}, msg.callback);
}

Expand All @@ -2371,7 +2358,7 @@ function getHistory(msg) {
}
} catch (err) {
return adapter.sendTo(msg.from, msg.command, {
error: 'Invalid call. End date ' + JSON.stringify(options.end) + ' is not a valid date'
error: `Invalid call. End date ${JSON.stringify(options.end)} is not a valid date`
}, msg.callback);
}

Expand Down

0 comments on commit 0a3a40a

Please sign in to comment.