diff --git a/Services.Test/ReplayFileServiceTest.cs b/Services.Test/ReplayFileServiceTest.cs index 44df5ca0..1832cf94 100644 --- a/Services.Test/ReplayFileServiceTest.cs +++ b/Services.Test/ReplayFileServiceTest.cs @@ -48,7 +48,6 @@ public ReplayFileServiceTest() this.target = new ReplayFileService( this.config.Object, this.enginesFactory.Object, - this.replayFilesStorage.Object, this.log.Object); } diff --git a/Services/Concurrency/ConcurrencyConfig.cs b/Services/Concurrency/ConcurrencyConfig.cs index 1bc7dff4..2db1b531 100644 --- a/Services/Concurrency/ConcurrencyConfig.cs +++ b/Services/Concurrency/ConcurrencyConfig.cs @@ -12,6 +12,7 @@ public interface IAppConcurrencyConfig int MaxPendingTelemetry { get; } int MaxPendingTwinWrites { get; } int MinDeviceStateLoopDuration { get; } + int MinDeviceReplayLoopDuration { get; } int MinDeviceConnectionLoopDuration { get; } int MinDeviceTelemetryLoopDuration { get; } int MinDevicePropertiesLoopDuration { get; } @@ -26,6 +27,7 @@ public class AppConcurrencyConfig : IAppConcurrencyConfig private const int DEFAULT_MAX_PENDING_TELEMETRY = 1000; private const int DEFAULT_MAX_PENDING_TWIN_WRITES = 50; private const int DEFAULT_MIN_DEVICE_STATE_LOOP_DURATION = 1000; + private const int DEFAULT_MIN_DEVICE_REPLAY_LOOP_DURATION = 1000; private const int DEFAULT_MIN_DEVICE_CONNECTION_LOOP_DURATION = 1000; private const int DEFAULT_MIN_DEVICE_TELEMETRY_LOOP_DURATION = 500; private const int DEFAULT_MIN_DEVICE_PROPERTIES_LOOP_DURATION = 2000; @@ -42,6 +44,7 @@ public class AppConcurrencyConfig : IAppConcurrencyConfig private int maxPendingTelemetry; private int maxPendingTwinWrites; private int minDeviceStateLoopDuration; + private int minDeviceReplayLoopDuration; private int minDeviceConnectionLoopDuration; private int minDeviceTelemetryLoopDuration; private int minDevicePropertiesLoopDuration; @@ -55,6 +58,7 @@ public AppConcurrencyConfig() this.MaxPendingTelemetry = DEFAULT_MAX_PENDING_TELEMETRY; this.MaxPendingTwinWrites = DEFAULT_MAX_PENDING_TWIN_WRITES; this.MinDeviceStateLoopDuration = DEFAULT_MIN_DEVICE_STATE_LOOP_DURATION; + this.MinDeviceReplayLoopDuration = DEFAULT_MIN_DEVICE_REPLAY_LOOP_DURATION; this.MinDeviceConnectionLoopDuration = DEFAULT_MIN_DEVICE_CONNECTION_LOOP_DURATION; this.MinDeviceTelemetryLoopDuration = DEFAULT_MIN_DEVICE_TELEMETRY_LOOP_DURATION; this.MinDevicePropertiesLoopDuration = DEFAULT_MIN_DEVICE_PROPERTIES_LOOP_DURATION; @@ -207,6 +211,27 @@ public int MinDeviceTelemetryLoopDuration } } + /// + /// When sending telemetry for all the devices in a thread, slow down if the loop through + /// all the devices takes less than N msecs. This is also the minimum time between two + /// messages from the same device. + /// + public int MinDeviceReplayLoopDuration + { + get => this.minDeviceReplayLoopDuration; + set + { + if (value < 1 || value > MAX_LOOP_DURATION) + { + throw new InvalidConfigurationException( + "The min duration of the device telemetry loop is not valid. " + + "Use a value within the range of 1 and " + MAX_LOOP_DURATION); + } + + this.minDeviceReplayLoopDuration = value; + } + } + /// /// When writing device twins for all the devices in a thread, slow down if the loop through /// all the devices takes less than N msecs. diff --git a/Services/Models/DeviceModel.cs b/Services/Models/DeviceModel.cs index b003ee64..b4e7aefc 100644 --- a/Services/Models/DeviceModel.cs +++ b/Services/Models/DeviceModel.cs @@ -63,6 +63,7 @@ public DeviceModel() this.Protocol = IoTHubProtocol.AMQP; this.Simulation = new StateSimulation(); this.Properties = new Dictionary(); + this.Telemetry = new List(); this.CloudToDeviceMethods = new Dictionary(); } diff --git a/Services/Models/Simulation.cs b/Services/Models/Simulation.cs index 54a8d219..137ba9a2 100644 --- a/Services/Models/Simulation.cs +++ b/Services/Models/Simulation.cs @@ -136,10 +136,12 @@ public DateTimeOffset? EndTime [JsonProperty(Order = 150)] public DateTimeOffset? ActualStartTime { get; set; } - // ReplayFileId is the id of the replay file in storage + // ReplayFileId is the replay file data in storage [JsonProperty(Order = 160)] public string ReplayFileId { get; set; } + public bool ReplayFileRunIndefinitely { get; set; } + public Simulation() { // When unspecified, a simulation is enabled diff --git a/Services/ReplayFileService.cs b/Services/ReplayFileService.cs index 9aa84295..857f4093 100644 --- a/Services/ReplayFileService.cs +++ b/Services/ReplayFileService.cs @@ -9,8 +9,6 @@ using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime; using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Storage; using Newtonsoft.Json; -using Microsoft.VisualBasic.FileIO; -using FieldType = Microsoft.VisualBasic.FileIO.FieldType; namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.Services { @@ -39,13 +37,13 @@ public interface IReplayFileService public class ReplayFileService : IReplayFileService { + private const int NUM_CSV_COLS = 3; private readonly IEngine replayFilesStorage; private readonly ILogger log; public ReplayFileService( IServicesConfig config, IEngines engines, - IEngine storage, ILogger logger) { this.replayFilesStorage = engines.Build(config.ReplayFilesStorage); @@ -149,25 +147,26 @@ public string ValidateFile(Stream stream) { var reader = new StreamReader(stream); var file = reader.ReadToEnd(); - - using (TextFieldParser parser = new TextFieldParser(file)) + + while (!reader.EndOfStream) { - parser.TextFieldType = FieldType.Delimited; - parser.SetDelimiters(","); - while (!parser.EndOfData) + try { - try - { - string[] lines = parser.ReadFields(); - } - catch (MalformedLineException ex) + string line = reader.ReadLine(); + string[] fields = line.Split(','); + if (fields.Length < NUM_CSV_COLS) { - this.log.Error("Replay file has invalid csv format", () => new { ex }); - throw new InvalidInputException("Replay file has invalid csv format", ex); + this.log.Error("Replay file has invalid csv format"); + throw new InvalidInputException("Replay file has invalid csv format"); } } + catch (Exception ex) + { + this.log.Error("Error parsing replay file", () => new { ex }); + throw new InvalidInputException("Error parsing replay file", ex); + } } - + return file; } } diff --git a/Services/Services.csproj b/Services/Services.csproj index 3d02ba6b..a016db06 100644 --- a/Services/Services.csproj +++ b/Services/Services.csproj @@ -6,6 +6,12 @@ Microsoft.Azure.IoTSolutions.DeviceSimulation.Services + + Always + + + Always + Always diff --git a/Services/data/replayfile/replay.json b/Services/data/replayfile/replay.json new file mode 100644 index 00000000..da848191 --- /dev/null +++ b/Services/data/replayfile/replay.json @@ -0,0 +1,34 @@ +{ + "SchemaVersion": "1.0.0", + "Id": "replay", + "Version": "0.0.1", + "Name": "Replay", + "Description": "Fake device model for csv file replay", + "Protocol": "AMQP", + "ReplayFile": "", + "Simulation": { + "InitialState": { + "online": true + }, + "Interval": "00:00:00", + "Scripts": [ + ] + }, + "Properties": { + }, + "Tags": { + }, + "Telemetry": [ + { + "MessageTemplate": "", + "MessageSchema": { + "Name": "replay-sensors;v1", + "Format": "JSON", + "Fields": { + } + } + } + ], + "CloudToDeviceMethods": { + } +} \ No newline at end of file diff --git a/Services/data/replayfile/simulationReplayTest.csv b/Services/data/replayfile/simulationReplayTest.csv new file mode 100644 index 00000000..0af6d9ec --- /dev/null +++ b/Services/data/replayfile/simulationReplayTest.csv @@ -0,0 +1,3 @@ +telemetry, 00:00:00,{"temperature": 51.32, "temperature_unit": "fahrenheit", "humidity": 69.59, "humidity_unit":"RH", "pressure": 440.20, "pressure_unit": "psi"} +telemetry, 00:00:30,{"temperature": 71.32, "temperature_unit": "fahrenheit", "humidity": 79.59, "humidity_unit":"RH", "pressure": 240.20, "pressure_unit": "psi"} +telemetry, 00:01:00,{"temperature": 30.00, "temperature_unit": "fahrenheit", "humidity": 59.59, "humidity_unit":"RH", "pressure": 340.20, "pressure_unit": "psi"} \ No newline at end of file diff --git a/SimulationAgent.Test/SimulationManagerTest.cs b/SimulationAgent.Test/SimulationManagerTest.cs index 9a0b9896..c0a42db9 100644 --- a/SimulationAgent.Test/SimulationManagerTest.cs +++ b/SimulationAgent.Test/SimulationManagerTest.cs @@ -15,6 +15,7 @@ using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceConnection; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceProperties; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceState; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceTelemetry; using Moq; @@ -50,6 +51,7 @@ public class SimulationManagerTest private readonly ConcurrentDictionary mockDeviceContext; private readonly ConcurrentDictionary deviceTelemetryActors; private readonly ConcurrentDictionary devicePropertiesActors; + private readonly ConcurrentDictionary deviceReplayActors; private SimulationManager target; @@ -85,13 +87,15 @@ public SimulationManagerTest() this.mockDeviceContext = new ConcurrentDictionary(); this.deviceTelemetryActors = new ConcurrentDictionary(); this.devicePropertiesActors = new ConcurrentDictionary(); + this.deviceReplayActors = new ConcurrentDictionary(); this.target.InitAsync( simulation, this.deviceStateActors, this.mockDeviceContext, this.deviceTelemetryActors, - this.devicePropertiesActors).Wait(Constants.TEST_TIMEOUT); + this.devicePropertiesActors, + this.deviceReplayActors).Wait(Constants.TEST_TIMEOUT); } [Fact] diff --git a/SimulationAgent.Test/SimulationThreads/DeviceReplayTaskTest.cs b/SimulationAgent.Test/SimulationThreads/DeviceReplayTaskTest.cs new file mode 100644 index 00000000..72f6fa7a --- /dev/null +++ b/SimulationAgent.Test/SimulationThreads/DeviceReplayTaskTest.cs @@ -0,0 +1,132 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Concurrent; +using System.Threading; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.SimulationThreads; +using Moq; +using Xunit; +using System.Threading.Tasks; + +namespace SimulationAgent.Test.SimulationThreads +{ + public class DeviceReplayTaskTest + { + private const int NUM_ACTORS = 9; + private const int MAX_PENDING_TASKS = 5; + + private readonly Mock mockAppConcurrencyConfig; + private readonly Mock mockLogger; + private readonly DeviceReplayTask target; + private readonly ConcurrentDictionary> mockDeviceReplayActors; + private readonly ConcurrentDictionary mockDeviceReplayActorObjects; + private readonly ConcurrentDictionary> mockSimulationManagers; + private readonly ConcurrentDictionary mockSimulationManagerObjects; + + public DeviceReplayTaskTest() + { + this.mockDeviceReplayActors = new ConcurrentDictionary>(); + this.mockDeviceReplayActorObjects = new ConcurrentDictionary(); + this.mockSimulationManagers = new ConcurrentDictionary>(); + this.mockSimulationManagerObjects = new ConcurrentDictionary(); + + this.mockAppConcurrencyConfig = new Mock(); + this.mockAppConcurrencyConfig.SetupGet(x => x.MaxPendingTasks).Returns(MAX_PENDING_TASKS); + this.mockLogger = new Mock(); + + this.target = new DeviceReplayTask(this.mockAppConcurrencyConfig.Object, this.mockLogger.Object); + } + + [Fact] + public void ItCallsRunAsyncOnAllReplayActors() + { + // Arrange + var cancellationToken = new CancellationTokenSource(); + + this.BuildMockDeviceReplayActors( + this.mockDeviceReplayActors, + this.mockDeviceReplayActorObjects, + cancellationToken, + NUM_ACTORS); + + // Build a list of SimulationManagers + this.BuildMockSimluationManagers( + this.mockSimulationManagers, + this.mockSimulationManagerObjects, + cancellationToken, + NUM_ACTORS); + + // Act + // Act on the target. The cancellation token will be cancelled through + // a callback that will be triggered when each device-replay actor + // is called. + var targetTask = this.target.RunAsync( + this.mockSimulationManagerObjects, + this.mockDeviceReplayActorObjects, + cancellationToken.Token); + + // Assert + // Verify that each SimulationManager was called at least once + foreach (var actor in this.mockDeviceReplayActors) + actor.Value.Verify(x => x.HasWorkToDo(), Times.Once); + } + + private void BuildMockDeviceReplayActors( + ConcurrentDictionary> mockDictionary, + ConcurrentDictionary objectDictionary, + CancellationTokenSource cancellationToken, + int count) + { + mockDictionary.Clear(); + objectDictionary.Clear(); + + for (int i = 0; i < count; i++) + { + var deviceName = $"device_{i}"; + var mockDeviceReplayActor = new Mock(); + + // Have each DeviceReplayActor report that it has work to do + mockDeviceReplayActor.Setup(x => x.HasWorkToDo()).Returns(true); + mockDeviceReplayActor.Setup(x => x.RunAsync()).Returns(Task.CompletedTask) + .Callback(() => { cancellationToken.Cancel(); }); + + mockDictionary.TryAdd(deviceName, mockDeviceReplayActor); + objectDictionary.TryAdd(deviceName, mockDeviceReplayActor.Object); + } + } + + /* + * Creating two collections: one for the mocks, and another to store the + * mock objects. If we only created one collection and populated it with + * the mock objects, we wouldn't have a reference to the backing mock for + * each. + */ + private void BuildMockSimluationManagers( + ConcurrentDictionary> mockSimulationManagers, + ConcurrentDictionary mockSimulationManagerObjects, + CancellationTokenSource cancellationToken, + int count) + { + mockSimulationManagers.Clear(); + mockSimulationManagerObjects.Clear(); + + for (int i = 0; i < count; i++) + { + var deviceName = $"simulation_{i}"; + var mockSimulationManager = new Mock(); + + // We only want the main loop in the target to run once, so here we'll + // trigger a callback which will cancel the cancellation token that + // the main loop uses. + mockSimulationManager.Setup(x => x.NewConnectionLoop()) + .Callback(() => cancellationToken.Cancel()); + + mockSimulationManagers.TryAdd(deviceName, mockSimulationManager); + mockSimulationManagerObjects.TryAdd(deviceName, mockSimulationManager.Object); + } + } + } +} diff --git a/SimulationAgent/Agent.cs b/SimulationAgent/Agent.cs index 79180ed1..216967e1 100644 --- a/SimulationAgent/Agent.cs +++ b/SimulationAgent/Agent.cs @@ -16,6 +16,7 @@ using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceProperties; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceState; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceTelemetry; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.SimulationThreads; namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent @@ -80,6 +81,10 @@ public class Agent : ISimulationAgent private Thread[] devicesTelemetryThreads; private List devicesTelemetryTasks; + // The thread responsible for replaying simulations from a file + private Thread deviceReplayThread; + private IDeviceReplayTask deviceReplayTask; + // List of simulation managers, one for each simulation private readonly ConcurrentDictionary simulationManagers; @@ -95,6 +100,9 @@ public class Agent : ISimulationAgent // Contains all the actors sending device property updates to Azure IoT Hub, indexed by Simulation ID + Device ID (string concat) private readonly ConcurrentDictionary devicePropertiesActors; + // Contains all the actors sending device replay updates to Azure IoT Hub, indexed by Simulation ID + Device ID (string concat) + private readonly ConcurrentDictionary deviceReplayActors; + // Flag signaling whether the simulation is starting (to reduce blocked threads) private bool startingOrStopping; @@ -131,6 +139,7 @@ public Agent( this.deviceConnectionActors = new ConcurrentDictionary(); this.deviceTelemetryActors = new ConcurrentDictionary(); this.devicePropertiesActors = new ConcurrentDictionary(); + this.deviceReplayActors = new ConcurrentDictionary(); } public Task StartAsync(CancellationToken appStopToken) @@ -314,7 +323,8 @@ await manager.InitAsync( this.deviceStateActors, this.deviceConnectionActors, this.deviceTelemetryActors, - this.devicePropertiesActors); + this.devicePropertiesActors, + this.deviceReplayActors); this.simulationManagers[simulation.Id] = manager; @@ -394,6 +404,13 @@ private void TryToStartThreads() this.runningToken.Token)); } + this.deviceReplayTask = this.factory.Resolve(); + this.deviceReplayThread = new Thread( + () => this.deviceReplayTask.RunAsync( + this.simulationManagers, + this.deviceReplayActors, + this.runningToken.Token)); + // State try { @@ -467,6 +484,19 @@ private void TryToStartThreads() this.logDiagnostics.LogServiceError(msg, e); throw new Exception("Unable to start the device-telemetry threads", e); } + + // Replay + try + { + this.deviceReplayThread.Start(); + } + catch (Exception e) + { + var msg = "Unable to start the device-replay thread"; + this.log.Error(msg, e); + this.logDiagnostics.LogServiceError(msg, e); + throw new Exception("Unable to start the device-replay thread", e); + } } private void TryToStopThreads() @@ -520,6 +550,16 @@ private void TryToStopThreads() this.log.Warn("Unable to stop the telemetry thread in a clean way", () => new { threadNumber = i, e }); } } + + // Replay + try + { + this.deviceReplayThread.Interrupt(); + } + catch (Exception e) + { + this.log.Warn("Unable to stop the replay thread in a clean way", e); + } } private void SendSolutionHeartbeat() diff --git a/SimulationAgent/DeviceReplay/DeviceReplayActor.cs b/SimulationAgent/DeviceReplay/DeviceReplayActor.cs new file mode 100644 index 00000000..c46bad0c --- /dev/null +++ b/SimulationAgent/DeviceReplay/DeviceReplayActor.cs @@ -0,0 +1,227 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.IO; +using System.Threading.Tasks; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.DataStructures; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Runtime; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Storage; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceConnection; +using static Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Models.DeviceModel; + +namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay +{ + public interface IDeviceReplayActor + { + Task Init(ISimulationContext simulationContext, + string deviceId, + DeviceModel deviceModel, + IDeviceConnectionActor context); + + bool HasWorkToDo(); + Task RunAsync(); + void Stop(); + } + + public class DeviceReplayActor : IDeviceReplayActor + { + private enum ActorStatus + { + None, + ReadLine, + LineReady, + Stopped, + FileEnd, + Restart + } + + private readonly ILogger log; + private readonly IActorsLogger actorLogger; + private ISimulationContext simulationContext; + private DeviceModel deviceModel; + private readonly IInstance instance; + private readonly IReplayFileService replayFileService; + + private ActorStatus status; + private string deviceId; + private string currentLine; + private string file; + private StringReader fileReader; + private long whenToRun; + private long prevInterval; + private IDeviceConnectionActor deviceContext; + private DeviceModelMessageSchema emptySchema; + + private static long Now => DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + // Replay file constants + private const string TELEMETRY_TYPE = "telemetry"; + private const int NUM_CSV_COLS = 3; + private const int MS_HOUR = 3600000; + private const int MS_MINUTE = 60000; + private const int MS_SECOND = 1000; + + public DeviceReplayActor( + ILogger logger, + IActorsLogger actorLogger, + IServicesConfig config, + IEngines engines, + IInstance instance) + { + this.log = logger; + this.actorLogger = actorLogger; + this.instance = instance; + + this.status = ActorStatus.None; + this.deviceModel = null; + this.currentLine = ""; + this.whenToRun = 0; + this.prevInterval = 0; + this.emptySchema = new DeviceModelMessageSchema(); + this.replayFileService = new ReplayFileService(config, engines, logger); + } + + /// + /// Invoke this method before calling Execute(), to initialize the actor + /// with details like the device model and message type to simulate. + /// + public async Task Init(ISimulationContext simulationContext, + string deviceId, + DeviceModel deviceModel, + IDeviceConnectionActor context) + { + this.instance.InitOnce(); + + this.simulationContext = simulationContext; + this.deviceModel = deviceModel; + this.deviceId = deviceId; + this.deviceContext = context; + this.actorLogger.Init(deviceId, "Replay"); + + string fileId = simulationContext.ReplayFileId; + try + { + if (!string.IsNullOrEmpty(fileId)) + { + var data = await this.replayFileService.GetAsync(fileId); + this.file = data.Content; + this.fileReader = new StringReader(this.file); + this.status = ActorStatus.ReadLine; + } + } + catch (Exception e) + { + this.log.Error("Failed to read line", () => new { this.deviceId, e }); + } + + this.instance.InitComplete(); + } + + public bool HasWorkToDo() + { + if (Now < this.whenToRun) return false; + + if (!this.deviceContext.Connected) return false; + + switch (this.status) + { + case ActorStatus.ReadLine: + case ActorStatus.LineReady: + case ActorStatus.Restart: + return true; + } + + return false; + } + + public async Task RunAsync() + { + switch (this.status) + { + case ActorStatus.ReadLine: + this.ReadLine(); + break; + + case ActorStatus.LineReady: + this.SendTelemetry(); + break; + + case ActorStatus.Restart: + // Rewind the stream to the beginning + this.fileReader.Dispose(); + this.fileReader = new StringReader(this.file); + this.status = ActorStatus.ReadLine; + break; + } + } + + public void Stop() + { + this.log.Debug("Device replay actor stopped", + () => new { this.deviceId, Status = this.status.ToString() }); + + // Discard file reader resources + this.fileReader.Dispose(); + + this.status = ActorStatus.Stopped; + } + + private async void SendTelemetry() + { + try + { + await this.deviceContext.Client.SendMessageAsync(this.currentLine, this.emptySchema); + this.status = ActorStatus.ReadLine; + this.log.Debug("Sending message", () => new { this.deviceId }); + } + catch (Exception e) + { + this.Stop(); + this.log.Error("Failed to send message", () => new { this.deviceId, e }); + } + } + + private void ReadLine() + { + try + { + this.currentLine = this.fileReader.ReadLine(); + if (this.currentLine == null) + { + if (this.simulationContext.ReplayFileIndefinitely) + { + this.status = ActorStatus.Restart; + } + else + { + this.Stop(); + } + } + else + { + // Check for incorrectly formed csv + var values = this.currentLine.Split(','); + if (values.Length >= NUM_CSV_COLS && values[0] == TELEMETRY_TYPE) // Only send telemetry + { + var intervals = values[1].Split(':'); + var msInterval = (long.Parse(intervals[0]) * MS_HOUR) + + (long.Parse(intervals[1]) * MS_MINUTE) + + (long.Parse(intervals[2]) * MS_SECOND); + this.currentLine = String.Join("", values, NUM_CSV_COLS - 1, values.Length - NUM_CSV_COLS - 1); + this.whenToRun = Now + msInterval - this.prevInterval; + this.prevInterval = msInterval; + this.status = ActorStatus.LineReady; + } + } + } + catch (Exception e) + { + this.Stop(); + this.log.Error("Failed to read line", () => new { this.deviceId, e }); + } + } + } +} diff --git a/SimulationAgent/SimulationContext.cs b/SimulationAgent/SimulationContext.cs index d64b9e10..bf509ed7 100644 --- a/SimulationAgent/SimulationContext.cs +++ b/SimulationAgent/SimulationContext.cs @@ -13,6 +13,8 @@ namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent public interface ISimulationContext { IRateLimiting RateLimiting { get; } + string ReplayFileId { get; } + bool ReplayFileIndefinitely { get; } IDevices Devices { get; } ConnectionLoopSettings ConnectionLoopSettings { get; } PropertiesLoopSettings PropertiesLoopSettings { get; } @@ -28,6 +30,8 @@ public class SimulationContext : ISimulationContext, IDisposable { // Note: this applies to a single hub; simulations with multiple hubs are not supported yet public IRateLimiting RateLimiting { get; } + public string ReplayFileId { get; private set; } + public bool ReplayFileIndefinitely { get; private set; } public IDevices Devices { get; } public ConnectionLoopSettings ConnectionLoopSettings { get; private set; } @@ -51,6 +55,8 @@ public async Task InitAsync(Simulation simulation) var rateLimits = simulation.RateLimits; this.RateLimiting.Init(rateLimits); + this.ReplayFileId = simulation.ReplayFileId; + this.ReplayFileIndefinitely = simulation.ReplayFileRunIndefinitely; this.ConnectionLoopSettings = new ConnectionLoopSettings(rateLimits); this.PropertiesLoopSettings = new PropertiesLoopSettings(rateLimits); diff --git a/SimulationAgent/SimulationManager.cs b/SimulationAgent/SimulationManager.cs index 54b26c11..526453e3 100644 --- a/SimulationAgent/SimulationManager.cs +++ b/SimulationAgent/SimulationManager.cs @@ -16,6 +16,7 @@ using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceProperties; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceState; using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceTelemetry; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay; using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Statistics; namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent @@ -27,7 +28,8 @@ Task InitAsync( ConcurrentDictionary deviceStateActors, ConcurrentDictionary deviceConnectionActors, ConcurrentDictionary deviceTelemetryActors, - ConcurrentDictionary devicePropertiesActors); + ConcurrentDictionary devicePropertiesActors, + ConcurrentDictionary deviceReplayActors); // === BEGIN - Executed by Agent.RunAsync @@ -81,6 +83,7 @@ public class SimulationManager : ISimulationManager private ConcurrentDictionary deviceConnectionActors; private ConcurrentDictionary deviceTelemetryActors; private ConcurrentDictionary devicePropertiesActors; + private ConcurrentDictionary deviceReplayActors; // List of the device partitions assigned to this node, including their content // in case they disappear from storage, used also to reduce storage lookups @@ -123,7 +126,8 @@ public async Task InitAsync( ConcurrentDictionary deviceStateActors, ConcurrentDictionary deviceConnectionActors, ConcurrentDictionary deviceTelemetryActors, - ConcurrentDictionary devicePropertiesActors) + ConcurrentDictionary devicePropertiesActors, + ConcurrentDictionary deviceReplayActors) { this.instance.InitOnce(); @@ -134,6 +138,7 @@ public async Task InitAsync( this.deviceConnectionActors = deviceConnectionActors; this.deviceTelemetryActors = deviceTelemetryActors; this.devicePropertiesActors = devicePropertiesActors; + this.deviceReplayActors = deviceReplayActors; this.instance.InitComplete(); } @@ -247,6 +252,7 @@ public async Task SaveStatisticsAsync() var connectionActors = this.deviceConnectionActors.Where(a => a.Key.StartsWith(prefix)).ToList(); var propertiesActors = this.devicePropertiesActors.Where(a => a.Key.StartsWith(prefix)).ToList(); var stateActors = this.deviceStateActors.Where(a => a.Key.StartsWith(prefix)).ToList(); + var replayActors = this.deviceReplayActors.Where(a => a.Key.StartsWith(prefix)).ToList(); var simulationModel = new SimulationStatisticsModel { @@ -255,6 +261,7 @@ public async Task SaveStatisticsAsync() FailedMessages = telemetryActors.Sum(a => a.Value.FailedMessagesCount), FailedDeviceConnections = connectionActors.Sum(a => a.Value.FailedDeviceConnectionsCount), FailedDevicePropertiesUpdates = propertiesActors.Sum(a => a.Value.FailedTwinUpdatesCount), + /* TODO: Add replay actors stats */ }; await this.simulationStatistics.CreateOrUpdateAsync(this.simulation.Id, simulationModel); @@ -277,6 +284,7 @@ public void TearDown() this.DeleteAllConnectionActors(); this.DeleteAllTelemetryActors(); this.DeleteAllPropertiesActors(); + this.DeleteAllReplayActors(); } // Check if the cluster size has changed and act accordingly @@ -399,6 +407,24 @@ private void DeleteAllPropertiesActors() toRemove.ForEach(x => this.devicePropertiesActors.Remove(x, out _)); } + private void DeleteAllReplayActors() + { + var prefix = this.GetDictKey(string.Empty); + + var toRemove = new List(); + foreach (var actor in this.deviceReplayActors) + { + // TODO: make this simpler, e.g. store the list of keys + if (actor.Key.StartsWith(prefix)) + { + actor.Value.Stop(); + toRemove.Add(actor.Key); + } + } + + toRemove.ForEach(x => this.deviceReplayActors.Remove(x, out _)); + } + private int DeleteActorsForPartition(DevicesPartition partition) { var count = 0; @@ -428,6 +454,7 @@ private void DeleteDeviceActors(string deviceId) this.deviceStateActors.TryRemove(dictKey, out _); this.deviceConnectionActors.TryRemove(dictKey, out _); this.devicePropertiesActors.TryRemove(dictKey, out _); + this.deviceReplayActors.TryRemove(dictKey, out _); var toRemove = new List(); foreach (var actor in this.deviceTelemetryActors) @@ -499,7 +526,7 @@ private async Task TryToCreateActorsForPartitionAsync(DevicesPartition par /** * For each device create one actor to periodically update the internal state, * one actor to manage the connection to the hub, and one actor for each - * telemetry message to send. + * telemetry message to send. ...................... */ private void CreateActorsForDevice(string deviceId, DeviceModel deviceModel, int deviceCounter) { @@ -508,38 +535,58 @@ private void CreateActorsForDevice(string deviceId, DeviceModel deviceModel, int var dictKey = this.GetDictKey(deviceId); - // Create one state actor for each device - var deviceStateActor = this.factory.Resolve(); - deviceStateActor.Init(this.simulationContext, deviceId, deviceModel, deviceCounter); - this.deviceStateActors.AddOrUpdate(dictKey, deviceStateActor, (k, v) => deviceStateActor); - - // Create one connection actor for each device - var deviceContext = this.factory.Resolve(); - deviceContext.Init(this.simulationContext, deviceId, deviceModel, deviceStateActor, this.simulationContext.ConnectionLoopSettings); - this.deviceConnectionActors.AddOrUpdate(dictKey, deviceContext, (k, v) => deviceContext); - - // Create one device properties actor for each device - var devicePropertiesActor = this.factory.Resolve(); - devicePropertiesActor.Init(this.simulationContext, deviceId, deviceStateActor, deviceContext, this.simulationContext.PropertiesLoopSettings); - this.devicePropertiesActors.AddOrUpdate(dictKey, devicePropertiesActor, (k, v) => devicePropertiesActor); - - // Create one telemetry actor for each telemetry message to be sent - var i = 0; - foreach (var message in deviceModel.Telemetry) + // Create device actors for either replay or non-replay simulations + if (string.IsNullOrEmpty(this.simulation.ReplayFileId)) { - // Skip telemetry without an interval set - if (message.Interval.TotalMilliseconds <= 0) + // Create one state actor for each device + var deviceStateActor = this.factory.Resolve(); + deviceStateActor.Init(this.simulationContext, deviceId, deviceModel, deviceCounter); + this.deviceStateActors.AddOrUpdate(dictKey, deviceStateActor, (k, v) => deviceStateActor); + + // Create one connection actor for each device + var deviceContext = this.factory.Resolve(); + deviceContext.Init(this.simulationContext, deviceId, deviceModel, deviceStateActor, this.simulationContext.ConnectionLoopSettings); + this.deviceConnectionActors.AddOrUpdate(dictKey, deviceContext, (k, v) => deviceContext); + + // Create one device properties actor for each device + var devicePropertiesActor = this.factory.Resolve(); + devicePropertiesActor.Init(this.simulationContext, deviceId, deviceStateActor, deviceContext, this.simulationContext.PropertiesLoopSettings); + this.devicePropertiesActors.AddOrUpdate(dictKey, devicePropertiesActor, (k, v) => devicePropertiesActor); + + // Create one telemetry actor for each telemetry message to be sent + var i = 0; + foreach (var message in deviceModel.Telemetry) { - this.log.Warn("Skipping telemetry with interval = 0", - () => new { model = deviceModel.Id, message }); - continue; - } + // Skip telemetry without an interval set + if (message.Interval.TotalMilliseconds <= 0) + { + this.log.Warn("Skipping telemetry with interval = 0", + () => new { model = deviceModel.Id, message }); + continue; + } - var deviceTelemetryActor = this.factory.Resolve(); - deviceTelemetryActor.Init(this.simulationContext, deviceId, deviceModel, message, deviceStateActor, deviceContext); + var deviceTelemetryActor = this.factory.Resolve(); + deviceTelemetryActor.Init(this.simulationContext, deviceId, deviceModel, message, deviceStateActor, deviceContext); - var actorKey = this.GetTelemetryDictKey(dictKey, (i++).ToString()); - this.deviceTelemetryActors.AddOrUpdate(actorKey, deviceTelemetryActor, (k, v) => deviceTelemetryActor); + var actorKey = this.GetTelemetryDictKey(dictKey, (i++).ToString()); + this.deviceTelemetryActors.AddOrUpdate(actorKey, deviceTelemetryActor, (k, v) => deviceTelemetryActor); + } + } + else { + // Create one state actor for each device + var deviceStateActor = this.factory.Resolve(); + deviceStateActor.Init(this.simulationContext, deviceId, deviceModel, deviceCounter); + this.deviceStateActors.AddOrUpdate(dictKey, deviceStateActor, (k, v) => deviceStateActor); + + // Create one connection actor for each device + var deviceContext = this.factory.Resolve(); + deviceContext.Init(this.simulationContext, deviceId, deviceModel, deviceStateActor, this.simulationContext.ConnectionLoopSettings); + this.deviceConnectionActors.AddOrUpdate(dictKey, deviceContext, (k, v) => deviceContext); + + // Create one device replay actor for each device + var deviceReplayActor = this.factory.Resolve(); + deviceReplayActor.Init(this.simulationContext, deviceId, deviceModel, deviceContext); + this.deviceReplayActors.AddOrUpdate(dictKey, deviceReplayActor, (k, v) => deviceReplayActor); } } diff --git a/SimulationAgent/SimulationThreads/DeviceReplayTask.cs b/SimulationAgent/SimulationThreads/DeviceReplayTask.cs new file mode 100644 index 00000000..ed90380d --- /dev/null +++ b/SimulationAgent/SimulationThreads/DeviceReplayTask.cs @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Concurrency; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.Services.Diagnostics; +using Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.DeviceReplay; + +namespace Microsoft.Azure.IoTSolutions.DeviceSimulation.SimulationAgent.SimulationThreads +{ + public interface IDeviceReplayTask + { + Task RunAsync( + ConcurrentDictionary simulationManagers, + ConcurrentDictionary replayActors, + CancellationToken runningToken + ); + } + + public class DeviceReplayTask : IDeviceReplayTask + { + // Global settings, not affected by hub SKU or simulation settings + private readonly IAppConcurrencyConfig appConcurrencyConfig; + + private readonly ILogger log; + + public DeviceReplayTask( + IAppConcurrencyConfig appConcurrencyConfig, + ILogger logger) + { + this.appConcurrencyConfig = appConcurrencyConfig; + this.log = logger; + } + + public async Task RunAsync( + ConcurrentDictionary simulationManagers, + ConcurrentDictionary replayActors, + CancellationToken runningToken) + { + var tasks = new List(); + + while (!runningToken.IsCancellationRequested) + { + foreach (var actor in replayActors) { + if (actor.Value.HasWorkToDo()) { + tasks.Add(actor.Value.RunAsync()); + } + } + + var before = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + // Wait for any pending tasks. + if (tasks.Count > 0) + { + await Task.WhenAll(tasks); + tasks.Clear(); + } + + var durationMsecs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - before; + this.log.Debug("Device-replay loop completed", () => new { durationMsecs }); + this.SlowDownIfTooFast(durationMsecs, 1000); + } + } + + private void SlowDownIfTooFast(long duration, int min) + { + // Avoid sleeping for only one millisecond + if (duration >= min || min - duration <= 1) return; + + var pauseMsecs = min - (int) duration; + this.log.Debug("Pausing device-replay thread", () => new { pauseMsecs }); + Thread.Sleep(pauseMsecs); + } + } +} diff --git a/WebService.Test/WebService.Test.csproj b/WebService.Test/WebService.Test.csproj old mode 100755 new mode 100644 diff --git a/WebService/Startup.cs b/WebService/Startup.cs old mode 100755 new mode 100644 diff --git a/WebService/v1/Models/SimulationApiModel/SimulationApiModel.cs b/WebService/v1/Models/SimulationApiModel/SimulationApiModel.cs index 7de13ed6..25e0e300 100644 --- a/WebService/v1/Models/SimulationApiModel/SimulationApiModel.cs +++ b/WebService/v1/Models/SimulationApiModel/SimulationApiModel.cs @@ -73,6 +73,12 @@ public class SimulationApiModel // Note: read-only property, used only to report the simulation status [JsonProperty(PropertyName = "Statistics")] public SimulationStatistics Statistics { get; set; } + + [JsonProperty(PropertyName = "ReplayFileId")] + public string ReplayFileId { get; set; } + + [JsonProperty(PropertyName = "ReplayFileIndefinitely")] + public bool ReplayFileIndefinitely { get; set; } [JsonProperty(PropertyName = "RateLimits")] public SimulationRateLimits RateLimits { get; set; } @@ -136,6 +142,8 @@ public Simulation ToServiceModel( result.EndTime = DateHelper.ParseDateExpression(this.EndTime, now); result.DeviceModels = this.DeviceModels?.Select(x => x.ToServiceModel()).ToList(); result.RateLimits = this.RateLimits.ToServiceModel(defaultRateLimits); + result.ReplayFileId = this.ReplayFileId; + result.ReplayFileRunIndefinitely = this.ReplayFileIndefinitely; // Overwrite the value only if the request included the field, i.e. don't // enable/disable the simulation if the user didn't explicitly ask to. @@ -182,7 +190,10 @@ public static SimulationApiModel FromServiceModel( DeleteDevicesOnce = value.DeleteDevicesOnce, DevicesDeletionComplete = value.DevicesDeletionComplete, DeleteDevicesWhenSimulationEnds = value.DeleteDevicesWhenSimulationEnds, - IotHubs = new List() + IotHubs = new List(), + ReplayFileId = value.ReplayFileId, + ReplayFileIndefinitely = value.ReplayFileRunIndefinitely + }; foreach (var iotHubConnectionString in value.IotHubConnectionStrings)