Skip to content

Commit

Permalink
enable running sums and diffs in Expr by "previousInput['var'].Value"…
Browse files Browse the repository at this point in the history
… and "previousOutput.Value"
  • Loading branch information
naxan6 committed Oct 6, 2024
1 parent fc7efdc commit bb88484
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
2 changes: 1 addition & 1 deletion FKala.Core/KalaQl/KalaQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public KalaQuery FromQuery(string queryText)
case "Insert":
return new Op_Insert(line, fields[1].Trim(':'), fields[2], fields[3]);
case "Expr":
return new Op_Expresso(line, fields[1].Trim(':'), fields[2]);
return new Op_Expresso(line, fields[1].Trim(':'), fields[2].Replace('\'', '"'));
case "Publ":
return new Op_Publish(line, fields[1].Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries).ToList(), ParsePublishMode(fields[2]));
case "Mgmt":
Expand Down
23 changes: 16 additions & 7 deletions FKala.Core/KalaQl/Op_BaseQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ public override bool CanExecute(KalaQlContext context)

public override void Execute(KalaQlContext context)
{
//var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, DoSortRawFiles);
var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, context, DontInvalidateCache_ForUseWhileCacheRebuild).ToList();

//TODODODODODODDODODO REBUILD IST SOMIT DEFEKT
// bei ForceRebuild auch ohne Ausgabe etc. den Rebuild durchführen, ..was erst geschieht beim Materialisieren
//if (CacheResolution.ForceRebuild) result = result.ToList();

context.IntermediateDatasources.Add(
new ResultPromise()
{
Expand All @@ -52,11 +48,24 @@ public override void Execute(KalaQlContext context)
Creator = this,
ResultsetFactory = () =>
{
var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, context, DontInvalidateCache_ForUseWhileCacheRebuild);
return result;
// source file streaming
//var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, context, DontInvalidateCache_ForUseWhileCacheRebuild);
//return result;

//in-mem copy
return Clone(result);
}
});
this.hasExecuted = true;
}


private IEnumerable<DataPoint> Clone(IEnumerable<DataPoint> input)
{
foreach(var dp in input)
{
yield return dp.Clone();
}
}
}
}
63 changes: 53 additions & 10 deletions FKala.Core/KalaQl/Op_Expresso.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Newtonsoft.Json.Linq;
using System;
using System.Runtime.CompilerServices;
using System.Runtime.Intrinsics.Arm;

namespace FKala.Core.KalaQl
{
Expand All @@ -30,7 +31,17 @@ public Op_Expresso(string? line, string name, string expresso) : base(line)
UnknownIdInfo = Interpreter.DetectIdentifiers(Expresso);
var parameters = UnknownIdInfo.UnknownIdentifiers
.Order()
.Select(identifier => new Parameter(identifier, typeof(DataPoint)));
.Select(identifier => {
if (identifier == "previousInput")
{
return new Parameter(identifier, typeof(Dictionary<string, DataPoint>));
}
else
{
return new Parameter(identifier, typeof(DataPoint));
}

});

Lambda = Interpreter.Parse(Expresso, parameters.ToArray());
}
Expand All @@ -40,7 +51,10 @@ public class Skip { }
public override bool CanExecute(KalaQlContext context)
{
var IdInfo = Interpreter.DetectIdentifiers(Expresso);
return IdInfo.UnknownIdentifiers.All(id => context.IntermediateDatasources.Any(res => res.Name == id));
var missingIdentifiers = IdInfo.UnknownIdentifiers.ToList();
missingIdentifiers.Remove("previousInput");
missingIdentifiers.Remove("previousOutput");
return missingIdentifiers.All(id => context.IntermediateDatasources.Any(res => res.Name == id));
}

public override void Execute(KalaQlContext context)
Expand All @@ -64,19 +78,34 @@ public override void Execute(KalaQlContext context)
this.hasExecuted = true;

}
DateTime firstStartTime;

public IEnumerable<DataPoint> ExecuteInternal(KalaQlContext context, IEnumerable<ResultPromise> datenquellen)
{
Dictionary<string, DataPoint> previousInput = new Dictionary<string, DataPoint>();
foreach (string id in UnknownIdInfo.UnknownIdentifiers)
{
if (id == "previousInput") continue;
previousInput[id] = Pools.DataPoint.Get();
previousInput[id].Value = 0;
}


var pInitial = Pools.DataPoint.Get();
pInitial.Value = 0;
DataPoint? previousOutput = pInitial;
var combined = new List<(DateTime Timestamp, int ListIndex, ResultPromise Item)>();
bool isFirstStartTime = true;
foreach (var timeSynchronizedItems in DatasetsCombiner2.CombineSynchronizedResults(datenquellen.ToList()))
{
if (isFirstStartTime)
{
firstStartTime = timeSynchronizedItems.First().DataPoint.StartTime;
}

var missingIdentifiers = UnknownIdInfo.UnknownIdentifiers.ToList();
var paramValues = new List<Parameter>();
var paramValues = new List<Parameter>
{
new Parameter("previousInput", previousInput),
new Parameter("previousOutput", previousOutput)
};
missingIdentifiers.Remove("previousInput");
missingIdentifiers.Remove("previousOutput");

foreach (var si in timeSynchronizedItems)
{
paramValues.Add(new Parameter(si.Result.Name, si.DataPoint));
Expand All @@ -89,7 +118,12 @@ public IEnumerable<DataPoint> ExecuteInternal(KalaQlContext context, IEnumerable
localDps.Add(ldp);
paramValues.Add(new Parameter(mi, ldp));
}

var nextPreviousInput = new Dictionary<string, DataPoint>();
foreach (var pv in paramValues)
{
if (pv.Name == "previousInput") continue;
nextPreviousInput[pv.Name] = ((DataPoint)pv.Value).Clone();
}
object? result = Lambda.Invoke(paramValues);
if (result is Skip)
{
Expand All @@ -102,8 +136,17 @@ public IEnumerable<DataPoint> ExecuteInternal(KalaQlContext context, IEnumerable
currentDataPoint.StartTime = timeSynchronizedItems.Key.Item1;
currentDataPoint.EndTime = timeSynchronizedItems.Key.Item2;
currentDataPoint.Value = expressoResultValue;
previousOutput.Value = expressoResultValue;
yield return currentDataPoint;
}

//foreach (var item in previousInput.Values)
//{
// Pools.DataPoint.Return(item);
//}
previousInput = nextPreviousInput;


}
}
}
Expand Down
11 changes: 11 additions & 0 deletions FKala.Core/Model/DataPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,16 @@ public string AsLineData(string measurement)
{
return $"{measurement} {StartTime.ToString("yyyy-MM-ddTHH:mm:ss.fffffff")} {Value?.ToString() ?? ValueText}";
}

public DataPoint Clone()
{
var ret = Pools.DataPoint.Get();
ret.StartTime = new DateTime(StartTime.Ticks);
ret.EndTime = new DateTime(EndTime.Ticks);
ret.Value = Value;
ret.ValueText = ValueText;
ret.Source = Source;
return ret;
}
}
}

0 comments on commit bb88484

Please sign in to comment.