Skip to content

Commit

Permalink
optimized streaming, handling error on sortmarking while doing parall…
Browse files Browse the repository at this point in the history
…el querying to the same file, added Loaj (load from json)
  • Loading branch information
naxan6 committed Oct 1, 2024
1 parent 2ff6bda commit d819a12
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 3 deletions.
7 changes: 6 additions & 1 deletion FKala.Api/Controller/StreamQueryController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ public StreamQueryController(IDataLayer dataLayer, ILogger<QueryController> logg
// Verarbeite den String hier nach Bedarf
var inputmultiline = ProcessString(input);

DateTime previous = DateTime.Now;
await foreach (var retRowDict in DoQuery(inputmultiline))
{
await Task.Delay(TimeSpan.FromMilliseconds(1));
if (previous.AddMilliseconds(250) < DateTime.Now)
{
previous = DateTime.Now;
await Task.Delay(TimeSpan.FromMilliseconds(1));
}
yield return retRowDict;
}

Expand Down
6 changes: 5 additions & 1 deletion FKala.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddControllers(
options => options.InputFormatters.Add(new PlainTextFormatter())
);
).AddJsonOptions((options) =>
{
//options.JsonSerializerOptions.DefaultBufferSize = 4096;
});

//.AddNewtonsoftJson(options =>
// {
// options.SerializerSettings.DateFormatString = "yyyy'-'MM'-'dd'T'HH':'mm':'ssZ";
Expand Down
11 changes: 11 additions & 0 deletions FKala.Api/README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ Loads data from storage
it uses a cache with minutely aggregated data. "Max" was used to condense the data to minutely values.
if the cache doesn't exist yet, it gets populated. if it does exists, it is NOT used - but recreated (because of "_Rebuild").

### Loaj
Nearly like Load but allows access to a member of a json value.
See example, only a path to a json-member is added on top of Load-parameters ("data/general/freeHeap").
Array-indices are only supported as leaves, e.g. "nrg[0]" will get you the first entry, so '{ "nrg" : [11,22,33] }' will give you 11.
```
Loaj rFreeHeap: nxG1`$getStatus`$response data/general/freeHeap 2024-09-22T00:00:00 2024-09-30T00:00:00 NoCache
Loaj rUptime: nxG1`$getStatus`$response data/general/uptime64 2024-09-22T00:00:00 2024-09-30T00:00:00 NoCache
Aggr a: rFreeHeap Aligned_1Hour Min
Aggr b: rUptime Aligned_1Hour Min
Publ a,b Table"
```
### Aggr
Aggregates Data.
#### Pattern ```Aggr <Name>: <Source> <Window> <Aggregate> [EmptyWindows]```
Expand Down
8 changes: 7 additions & 1 deletion FKala.Core/DataLayer/StorageAccess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,13 @@ private void MarkFileAsSorted(string currentPath)
char[] newPath = currentPath.ToCharArray();// "measure$aasd_2024-11-02.dat"
newPath[newPath.Length - 15] = '#'; // "<measure$aasd#2024-11-02.dat"
string sortedMarkedPath = new string(newPath);
File.Move(currentPath, sortedMarkedPath);
try
{
File.Move(currentPath, sortedMarkedPath);
} catch (Exception)
{
Console.WriteLine("failed renaming to sorted. maybe already marked sorted by parallel stream");
}
}

static bool IsSorted<T>(List<T> list) where T : IComparable<T>
Expand Down
7 changes: 7 additions & 0 deletions FKala.Core/KalaQl/KalaQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ public KalaQuery FromQuery(string queryText)
}
if (fields.Count < 6) throw new Exception($"6 Parameters needed. Example: Load NAME: mesaurename 0001-01-01T00:00:00 9999-12-31T00:00:00 NoCache. But got: {line}");
return new Op_BaseQuery(line, fields[1].Trim(':'), fields[2], ParseDateTime(fields[3]), ParseDateTime(fields[4]), ParseCacheResolution(fields[5]));
case "Loaj":
if (fields[3] == "NewestOnly")
{
return new Op_JsonQuery(line, fields[1].Trim(':'), fields[2], fields[3], DateTime.MinValue, DateTime.MaxValue, CacheResolutionPredefined.NoCache, true);
}
if (fields.Count < 6) throw new Exception($"6 Parameters needed. Example: Load NAME: mesaurename 0001-01-01T00:00:00 9999-12-31T00:00:00 NoCache. But got: {line}");
return new Op_JsonQuery(line, fields[1].Trim(':'), fields[2], fields[3], ParseDateTime(fields[4]), ParseDateTime(fields[5]), ParseCacheResolution(fields[6]));
case "Aggr":
return new Op_Aggregate(line, fields[1].Trim(':'), fields[2], ParseWindow(fields[3]), ParseAggregate(fields[4]), ParseEmptyWindows(fields.Count > 5 ? fields[5] : ""));
case "Inpo":
Expand Down
129 changes: 129 additions & 0 deletions FKala.Core/KalaQl/Op_JsonQuery.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
using FKala.Core.DataLayer.Infrastructure;
using FKala.Core.Interfaces;
using FKala.Core.KalaQl.Windowing;
using FKala.Core.Model;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace FKala.Core.KalaQl
{
public class Op_JsonQuery : Op_Base, IKalaQlOperation
{
public string Name { get; }
public string Measurement { get; }
public DateTime StartTime { get; }
public DateTime EndTime { get; }
public CacheResolution CacheResolution { get; }
public bool NewestOnly { get; }
public bool DoSortRawFiles { get; }
public string FieldPath { get; }

public Op_JsonQuery(string? line, string name, string measurement, string fieldPath, DateTime startTime, DateTime endTime, CacheResolution cacheResolution, bool newestOnly = false, bool doSortRawFiles = false) : base(line)
{
this.Name = name;
this.Measurement = measurement;
this.StartTime = startTime;
this.EndTime = endTime;
this.CacheResolution = cacheResolution;
this.NewestOnly = newestOnly;
this.DoSortRawFiles = doSortRawFiles;
this.FieldPath = fieldPath;
}

public override bool CanExecute(KalaQlContext context)
{
return true;
}

public override void Execute(KalaQlContext context)
{
//var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, DoSortRawFiles);

//TODODODODODODDODODO REBUILD IST SOMIT DEFEKT
// bei ForceRebuild auch ohne Ausgabe etc. den Rebuild durchführen, ..was erst geschieht beim Materialisieren
//if (CacheResolution.ForceRebuild) result = result.ToList();

var pathParts = FieldPath.Split("/");

context.IntermediateDatasources.Add(
new ResultPromise()
{
Name = this.Name,
Query_StartTime = StartTime,
Query_EndTime = EndTime,
Creator = this,
ResultsetFactory = () =>
{
var result = context.DataLayer.LoadData(this.Measurement, this.StartTime, this.EndTime, CacheResolution, NewestOnly, DoSortRawFiles, context);
return ReadJson(result, pathParts);
}
});
this.hasExecuted = true;
}

public IEnumerable<DataPoint> ReadJson(IEnumerable<DataPoint> jsonEnum, string[]? pathParts)
{
int index = -1;
if (pathParts[pathParts.Length - 1].EndsWith("]"))
{
var ps = pathParts[pathParts.Length - 1].Split('[');
pathParts[pathParts.Length - 1] = ps[0];
index = int.Parse(ps[1].TrimEnd(']'));
}
foreach (var item in jsonEnum)
{
var jsonDict = JsonConvert.DeserializeObject<Dictionary<string, object>>(item.ValueText);
foreach (var pathPart in pathParts)
{
if (jsonDict.ContainsKey(pathPart))
{
object jsonDictEntry = jsonDict[pathPart];
if (jsonDictEntry is JToken)
{
if ((jsonDictEntry as JToken).Type == JTokenType.Object)
{
jsonDict = (jsonDictEntry as JObject).ToObject<Dictionary<string, object>>();
}
else if ((jsonDictEntry as JToken).Type == JTokenType.Array)
{
var jarray = (jsonDictEntry as JArray);
var dp = Pools.DataPoint.Get();
dp.StartTime = item.StartTime;
dp.EndTime = item.EndTime;
dp.Source = item.Source;
dp.ValueText = jarray.ToList()[index].ToString();
yield return dp;
}
}
else
{
if (decimal.TryParse(jsonDictEntry.ToString(), out decimal decimalValue))
{
var dp = Pools.DataPoint.Get();
dp.StartTime = item.StartTime;
dp.EndTime = item.EndTime;
dp.Source = item.Source;
dp.Value = decimalValue;
yield return dp;
}
else
{
var dp = Pools.DataPoint.Get();
dp.StartTime = item.StartTime;
dp.EndTime = item.EndTime;
dp.Source = item.Source;
dp.ValueText = jsonDictEntry.ToString();
yield return dp;
}
}
}
}
}
}
}
}

0 comments on commit d819a12

Please sign in to comment.