Skip to content

Commit

Permalink
better ui; insert operation; conitional datapoint/window skippping (s…
Browse files Browse the repository at this point in the history
…ee example in READE.MD)
  • Loading branch information
naxan6 committed Oct 3, 2024
1 parent 06a008f commit 7d04981
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 22 deletions.
11 changes: 11 additions & 0 deletions FKala.Api/README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,17 @@ Can be used to calculate with and combine datasets.
##### Examples
- ```Expr StromVerbrauch_KWH: "(StromVerbrauch_WH.Value) / 1000"``` - divide the value of StromVerbrauch_WH by 1000 an create a new dataset named StromVerbrauch_KWH by the result.
- ```Expr PVSumInWatt: "(PV1_Windowed.Value + PV2_Windowed.Value) * 1000"``` - add PV1 and PV2 together and multiply to get W instead of kW, name the result PVSumInWatt.
- ```# skipping requires casting the value to (object) and skips the datapoint, null on the other hand results in a datapoint with value null.
Var $FROM 2024-08-01T00:00:00Z
Var $TO 2024-08-02T00:00:00Z
Load rPV1A: Sofar/measure/PVInput1/0x585_Current_PV1[A] $FROM $TO NoCache
Load rPV2A: Sofar/measure/PVInput1/0x588_Current_PV2[A] $FROM $TO NoCache
Expr Filtered1: "rPV1A.Value < 4 ? (object)rPV1A.Value : skip"
Expr Filtered2: "rPV2A.Value < 4 ? rPV2A.Value : null"
#Insert Ins1: Filtered1 Sofar/measure/PVInput1/0x585_Current_PV1[A]CLEANED
#Insert Ins2: Filtered2 Sofar/measure/PVInput1/0x588_Current_PV2[A]CLEANED
Publ "Filtered1,Filtered2" Table
```

### Publ
Can be used to "publish" generated temporary datasets to the result. This is what you get.
Expand Down
14 changes: 8 additions & 6 deletions FKala.Api/wwwroot/ui/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,32 @@

const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');

function datePrefix() {
return "<span style='color:grey;'>"+new Date().toISOString() + " # </span>";
}
var firstline = true;
// Function to read each chunk from the stream
async function readStream() {
const outputDiv = document.getElementById('output');
const { done, value } = await reader.read();
if (done) {
console.log('Stream complete');
outputDiv.innerHTML += "<br/>===========================<br/>"
outputDiv.innerHTML += "<br/>" + datePrefix() + "Stream Complete ===========================<br/>"
return;
}

// Decode and display the chunk
const chunk = decoder.decode(value, { stream: true });
if (firstline) {
outputDiv.innerHTML += new Date().toISOString() + " # ";
outputDiv.innerHTML += "<br/>" + datePrefix() + "Stream Start ===========================<br/>"
firstline = false;
}
outputDiv.innerHTML += chunk.replaceAll("{", "<br>" + new Date().toISOString() + " # " + "{");
outputDiv.innerHTML += chunk.replaceAll("{", "<br>" + datePrefix() + "{");
autoScroll();

if (cancelled) {
reader.cancel("user");
outputDiv.innerHTML += "<br/>Cancelled ===========================<br/>"
outputDiv.innerHTML += "<br/>" + datePrefix() + "Stream Cancelled ===========================<br/>"
return;
}

Expand All @@ -121,7 +123,7 @@
URL: <br />
<input id="url" value="http://localhost:5258/api/StreamQuery" size="80"></input><br />
Query: <br />
<textarea id="query" name="Text1" cols="80" rows="15">Mgmt ListMeasurements</textarea>
<textarea id="query" name="Text1" cols="80" rows="15" spellcheck="false">Mgmt ListMeasurements</textarea>
<br />
<button id="send">Send</button>&nbsp;&nbsp;<button id="cancel">Cancel</button>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<button id="clear">Clear</button>
<div id="output" class="scrollable-div"></div>
Expand Down
8 changes: 5 additions & 3 deletions FKala.Core/DataLayer/DataLayer_Readable_Caching_V1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ public async IAsyncEnumerable<Dictionary<string, object>> MoveMeasurement(string
yield return new Dictionary<string, object>() { { "msg", $"renamed {measurementPathOld} to {measurementPathNew} (backup in {measurementPathNewBak}" } };
}

public void Insert(string rawData)

public void Insert(string measurement, DataPoint dataPoint, string? source = "input")
{
Insert(rawData, "input");
Insert(dataPoint.AsLineData(measurement), source);
}

/// <summary>
Expand All @@ -219,7 +220,7 @@ public void Insert(string rawData)
/// </summary>
/// <param name="rawData"></param>
/// <param name="locking"></param>
public void Insert(string rawData, string source)
public void Insert(string rawData, string source = "input")
{
if (ShuttingDown)
{
Expand Down Expand Up @@ -459,5 +460,6 @@ public void Shutdown()
this.ShuttingDown = true;
this.Dispose();
}

}
}
10 changes: 9 additions & 1 deletion FKala.Core/Helper/FilesystemHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ public static void DirectoryCopy(string sourceDirName, string destDirName, bool
foreach (FileInfo file in files)
{
string temppath = Path.Combine(destDirName, file.Name);
file.CopyTo(temppath, true);
try
{
file.CopyTo(temppath, false);
}
catch (IOException iex)
{
string iextemppath = Path.Combine(destDirName, $"{DateTime.Now.ToString("s")}_COPY_" + file.Name);
file.CopyTo(iextemppath, false);
}
}

if (copySubDirs)
Expand Down
6 changes: 3 additions & 3 deletions FKala.Core/Interfaces/IDataLayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public interface IDataLayer
{
string DataDirectory { get; }
CachingLayer CachingLayer { get; }
IEnumerable<DataPoint> LoadData(string measurement, DateTime startTime, DateTime endTime, CacheResolution cacheResolution, bool NewestOnly, KalaQl.KalaQlContext context, bool dontInvalidateCache_ForUseWhileCacheRebuild);
void Insert(string kalaLinedata);
void Insert(string kalaLinedata, string? source);
IEnumerable<DataPoint> LoadData(string measurement, DateTime startTime, DateTime endTime, CacheResolution cacheResolution, bool NewestOnly, KalaQl.KalaQlContext context, bool dontInvalidateCache_ForUseWhileCacheRebuild);
void Insert(string measurement, DataPoint dataPoint, string? source = null);
void Insert(string kalaLinedata, string? source = null);
List<int> LoadAvailableYears(string measurement);
List<string> LoadMeasurementList();
IEnumerable<DataPoint?> LoadNewestDatapoint(string measurement, KalaQl.KalaQlContext context);
Expand Down
2 changes: 2 additions & 0 deletions FKala.Core/KalaQl/KalaQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public KalaQuery FromQuery(string queryText)
return new Op_Aggregate(line, fields[1].Trim(':'), fields[2], ParseWindow(fields[3]), ParseAggregate(fields[4]), ParseEmptyWindows(fields.Count > 5 ? fields[5] : ""));
case "Inpo":
return new Op_Interpolate(line, fields[1].Trim(':'), fields[2], ParseInterpolationMode(fields[3]), ParseDecimalNullable(fields[4]));
case "Insert":
return new Op_Insert(line, fields[1].Trim(':'), fields[2], fields[3]);
case "Expr":
return new Op_Expresso(line, fields[1].Trim(':'), fields[2]);
case "Publ":
Expand Down
27 changes: 18 additions & 9 deletions FKala.Core/KalaQl/Op_Expresso.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using FKala.Core.Model;
using Newtonsoft.Json.Linq;
using System;
using System.Runtime.CompilerServices;

namespace FKala.Core.KalaQl
{
Expand All @@ -25,7 +26,7 @@ public Op_Expresso(string? line, string name, string expresso) : base(line)

Interpreter = new Interpreter();
Interpreter.SetDefaultNumberType(DefaultNumberType.Decimal);

Interpreter.SetVariable("skip", new Skip());
UnknownIdInfo = Interpreter.DetectIdentifiers(Expresso);
var parameters = UnknownIdInfo.UnknownIdentifiers
.Order()
Expand All @@ -34,6 +35,8 @@ public Op_Expresso(string? line, string name, string expresso) : base(line)
Lambda = Interpreter.Parse(Expresso, parameters.ToArray());
}

public class Skip { }

public override bool CanExecute(KalaQlContext context)
{
var IdInfo = Interpreter.DetectIdentifiers(Expresso);
Expand Down Expand Up @@ -72,7 +75,6 @@ public IEnumerable<DataPoint> ExecuteInternal(KalaQlContext context, IEnumerable
{
firstStartTime = timeSynchronizedItems.First().DataPoint.StartTime;
}

var missingIdentifiers = UnknownIdInfo.UnknownIdentifiers.ToList();
var paramValues = new List<Parameter>();
foreach (var si in timeSynchronizedItems)
Expand All @@ -88,13 +90,20 @@ public IEnumerable<DataPoint> ExecuteInternal(KalaQlContext context, IEnumerable
paramValues.Add(new Parameter(mi, ldp));
}

decimal? expressoResultValue = (decimal?)Lambda.Invoke(paramValues);

var currentDataPoint = Pools.DataPoint.Get();
currentDataPoint.StartTime = timeSynchronizedItems.Key.Item1;
currentDataPoint.EndTime = timeSynchronizedItems.Key.Item2;
currentDataPoint.Value = expressoResultValue;
yield return currentDataPoint;
object? result = Lambda.Invoke(paramValues);
if (result is Skip)
{
continue;
}
else
{
decimal? expressoResultValue = (decimal?)result;
var currentDataPoint = Pools.DataPoint.Get();
currentDataPoint.StartTime = timeSynchronizedItems.Key.Item1;
currentDataPoint.EndTime = timeSynchronizedItems.Key.Item2;
currentDataPoint.Value = expressoResultValue;
yield return currentDataPoint;
}
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions FKala.Core/KalaQl/Op_Insert.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using FKala.Core.DataLayer.Infrastructure;
using FKala.Core.Helper;
using FKala.Core.Interfaces;
using FKala.Core.KalaQl.Windowing;
using FKala.Core.Logic;
using FKala.Core.Model;
using System.Runtime.Intrinsics.Arm;

namespace FKala.Core.KalaQl
{
public class Op_Insert : Op_Base, IKalaQlOperation
{
public string Name { get; }
public string InputDataSetName { get; }
public string TargetMeasure { get; }


public Op_Insert(string? line, string name, string inputDataSet, string targetMeasure) : base(line)
{
Name = name;
InputDataSetName = inputDataSet;
TargetMeasure = targetMeasure;
}

public override bool CanExecute(KalaQlContext context)
{
return context.IntermediateDatasources.Any(x => x.Name == InputDataSetName);
}

public override void Execute(KalaQlContext context)
{
//var input = context.IntermediateResults.First(x => x.Name == InputDataSetName);
// Dieses ToList ist wichtig, da bei nachfolgendem Expresso mit Vermarmelung mehrerer Serien
// und gleichzeitiger Ausgabe aller dieser Serien im Publish
// sich die Zugriffe auf den Enumerable überschneiden und das ganze dann buggt
// (noch nicht final geklärt, z.B. siehe BUGTEST_KalaQl_2_Datasets_Aggregated_Expresso).
var input = context.IntermediateDatasources.First(x => x.Name == InputDataSetName);

var outgoingResult =
new ResultPromise()
{
Name = this.Name,
Creator = this,
Query_StartTime = input.Query_StartTime,
Query_EndTime = input.Query_EndTime,
ResultsetFactory = () =>
{
var resultset = InternalExecute(context, input);
return resultset;
}
};

context.IntermediateDatasources.Add(outgoingResult);
this.hasExecuted = true;
}

private IEnumerable<DataPoint> InternalExecute(KalaQlContext context, ResultPromise input)
{
var enumerable = input.ResultsetFactory();
var dataPointsEnumerator = enumerable.GetEnumerator();

int count = 0;
foreach (var dp in enumerable)
{
context.DataLayer.Insert(TargetMeasure, dp, $"Op_Insert <{Name}>");
count++;
}
yield return new DataPoint()
{
StartTime = input.Query_StartTime,
EndTime = input.Query_EndTime,
Source = Name,
ValueText = $"Inserted {count} datapoints into {TargetMeasure}"
};
}
}
}
6 changes: 6 additions & 0 deletions FKala.Core/Model/DataPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -50,5 +51,10 @@ public int GetHashCode([DisallowNull] DataPoint obj)
{
return $"{obj.StartTime.Ticks} # {obj.Value} # {obj.ValueText} #".GetHashCode();
}

public string AsLineData(string measurement)
{
return $"{measurement} {StartTime.ToString("yyyy-MM-ddTHH:mm:ss.fffffff")} {Value?.ToString() ?? ValueText}";
}
}
}

0 comments on commit 7d04981

Please sign in to comment.