diff --git a/FKala.Core/KalaQl/KalaQuery.cs b/FKala.Core/KalaQl/KalaQuery.cs index 622ac55..2654ccc 100644 --- a/FKala.Core/KalaQl/KalaQuery.cs +++ b/FKala.Core/KalaQl/KalaQuery.cs @@ -149,7 +149,7 @@ public KalaQuery FromQuery(string queryText) case "Insert": return new Op_Insert(line, fields[1].Trim(':'), fields[2], fields[3]); case "Expr": - return new Op_Expresso(line, fields[1].Trim(':'), fields[2]); + return new Op_Expresso(line, fields[1].Trim(':'), fields[2].Replace('\'', '"')); case "Publ": return new Op_Publish(line, fields[1].Split(",", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries).ToList(), ParsePublishMode(fields[2])); case "Mgmt": diff --git a/FKala.Core/KalaQl/Op_BaseQuery.cs b/FKala.Core/KalaQl/Op_BaseQuery.cs index d42743a..954dc56 100644 --- a/FKala.Core/KalaQl/Op_BaseQuery.cs +++ b/FKala.Core/KalaQl/Op_BaseQuery.cs @@ -37,12 +37,8 @@ public override bool CanExecute(KalaQlContext context) public override void Execute(KalaQlContext context) { - //var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, DoSortRawFiles); + var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, context, DontInvalidateCache_ForUseWhileCacheRebuild).ToList(); -//TODODODODODODDODODO REBUILD IST SOMIT DEFEKT - // bei ForceRebuild auch ohne Ausgabe etc. den Rebuild durchführen, ..was erst geschieht beim Materialisieren - //if (CacheResolution.ForceRebuild) result = result.ToList(); - context.IntermediateDatasources.Add( new ResultPromise() { @@ -52,11 +48,24 @@ public override void Execute(KalaQlContext context) Creator = this, ResultsetFactory = () => { - var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, context, DontInvalidateCache_ForUseWhileCacheRebuild); - return result; + // source file streaming + //var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, context, DontInvalidateCache_ForUseWhileCacheRebuild); + //return result; + + //in-mem copy + return Clone(result); } }); this.hasExecuted = true; } + + + private IEnumerable Clone(IEnumerable input) + { + foreach(var dp in input) + { + yield return dp.Clone(); + } + } } } diff --git a/FKala.Core/KalaQl/Op_Expresso.cs b/FKala.Core/KalaQl/Op_Expresso.cs index 47b3dd8..c0fd08a 100644 --- a/FKala.Core/KalaQl/Op_Expresso.cs +++ b/FKala.Core/KalaQl/Op_Expresso.cs @@ -8,6 +8,7 @@ using Newtonsoft.Json.Linq; using System; using System.Runtime.CompilerServices; +using System.Runtime.Intrinsics.Arm; namespace FKala.Core.KalaQl { @@ -30,7 +31,17 @@ public Op_Expresso(string? line, string name, string expresso) : base(line) UnknownIdInfo = Interpreter.DetectIdentifiers(Expresso); var parameters = UnknownIdInfo.UnknownIdentifiers .Order() - .Select(identifier => new Parameter(identifier, typeof(DataPoint))); + .Select(identifier => { + if (identifier == "previousInput") + { + return new Parameter(identifier, typeof(Dictionary)); + } + else + { + return new Parameter(identifier, typeof(DataPoint)); + } + + }); Lambda = Interpreter.Parse(Expresso, parameters.ToArray()); } @@ -40,7 +51,10 @@ public class Skip { } public override bool CanExecute(KalaQlContext context) { var IdInfo = Interpreter.DetectIdentifiers(Expresso); - return IdInfo.UnknownIdentifiers.All(id => context.IntermediateDatasources.Any(res => res.Name == id)); + var missingIdentifiers = IdInfo.UnknownIdentifiers.ToList(); + missingIdentifiers.Remove("previousInput"); + missingIdentifiers.Remove("previousOutput"); + return missingIdentifiers.All(id => context.IntermediateDatasources.Any(res => res.Name == id)); } public override void Execute(KalaQlContext context) @@ -64,19 +78,34 @@ public override void Execute(KalaQlContext context) this.hasExecuted = true; } - DateTime firstStartTime; + public IEnumerable ExecuteInternal(KalaQlContext context, IEnumerable datenquellen) { + Dictionary previousInput = new Dictionary(); + foreach (string id in UnknownIdInfo.UnknownIdentifiers) + { + if (id == "previousInput") continue; + previousInput[id] = Pools.DataPoint.Get(); + previousInput[id].Value = 0; + } + + + var pInitial = Pools.DataPoint.Get(); + pInitial.Value = 0; + DataPoint? previousOutput = pInitial; var combined = new List<(DateTime Timestamp, int ListIndex, ResultPromise Item)>(); - bool isFirstStartTime = true; foreach (var timeSynchronizedItems in DatasetsCombiner2.CombineSynchronizedResults(datenquellen.ToList())) { - if (isFirstStartTime) - { - firstStartTime = timeSynchronizedItems.First().DataPoint.StartTime; - } + var missingIdentifiers = UnknownIdInfo.UnknownIdentifiers.ToList(); - var paramValues = new List(); + var paramValues = new List + { + new Parameter("previousInput", previousInput), + new Parameter("previousOutput", previousOutput) + }; + missingIdentifiers.Remove("previousInput"); + missingIdentifiers.Remove("previousOutput"); + foreach (var si in timeSynchronizedItems) { paramValues.Add(new Parameter(si.Result.Name, si.DataPoint)); @@ -89,7 +118,12 @@ public IEnumerable ExecuteInternal(KalaQlContext context, IEnumerable localDps.Add(ldp); paramValues.Add(new Parameter(mi, ldp)); } - + var nextPreviousInput = new Dictionary(); + foreach (var pv in paramValues) + { + if (pv.Name == "previousInput") continue; + nextPreviousInput[pv.Name] = ((DataPoint)pv.Value).Clone(); + } object? result = Lambda.Invoke(paramValues); if (result is Skip) { @@ -102,8 +136,17 @@ public IEnumerable ExecuteInternal(KalaQlContext context, IEnumerable currentDataPoint.StartTime = timeSynchronizedItems.Key.Item1; currentDataPoint.EndTime = timeSynchronizedItems.Key.Item2; currentDataPoint.Value = expressoResultValue; + previousOutput.Value = expressoResultValue; yield return currentDataPoint; } + + //foreach (var item in previousInput.Values) + //{ + // Pools.DataPoint.Return(item); + //} + previousInput = nextPreviousInput; + + } } } diff --git a/FKala.Core/Model/DataPoint.cs b/FKala.Core/Model/DataPoint.cs index f57ae13..3044a42 100644 --- a/FKala.Core/Model/DataPoint.cs +++ b/FKala.Core/Model/DataPoint.cs @@ -56,5 +56,16 @@ public string AsLineData(string measurement) { return $"{measurement} {StartTime.ToString("yyyy-MM-ddTHH:mm:ss.fffffff")} {Value?.ToString() ?? ValueText}"; } + + public DataPoint Clone() + { + var ret = Pools.DataPoint.Get(); + ret.StartTime = new DateTime(StartTime.Ticks); + ret.EndTime = new DateTime(EndTime.Ticks); + ret.Value = Value; + ret.ValueText = ValueText; + ret.Source = Source; + return ret; + } } } \ No newline at end of file