Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced MessagesModel.registerHandler with a stream #168

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/src/data/metrics/vitals.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class VitalsMetrics extends Metrics {
}
}

/// Publicly exposes [notifyListeners].
void notify() => notifyListeners();

@override
List<MetricLine> get allMetrics => [
MetricLine("Voltage: ${drive.batteryVoltage.toStringAsFixed(2)} V", severity: voltageSeverity),
Expand Down
9 changes: 0 additions & 9 deletions lib/src/data/protobuf.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ export "package:protobuf/protobuf.dart" show GeneratedMessageGenericExtensions;
/// The `.fromBuffer` constructor is a type of [MessageDecoder].
typedef MessageDecoder<T extends Message> = T Function(List<int> data);

/// A callback to execute with a specific serialized Protobuf message.
typedef MessageHandler<T extends Message> = void Function(T);

/// A callback to handle any [WrappedMessage].
typedef WrappedMessageHandler = void Function(WrappedMessage);

/// A callback to execute with raw Protobuf data.
typedef RawDataHandler = void Function(List<int> data);

/// Gets the name of the command message for the given device.
String getCommandName(Device device) => switch (device) {
Device.ARM => "ArmCommand",
Expand Down
6 changes: 5 additions & 1 deletion lib/src/models/data/logs.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ class LogsModel extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<BurtLog>(name: BurtLog().messageName, decoder: BurtLog.fromBuffer, handler: handleLog);
models.messages.stream.onMessage<BurtLog>(
name: BurtLog().messageName,
constructor: BurtLog.fromBuffer,
callback: handleLog,
);
saveToFileTimer = Timer.periodic(saveToFileInterval, saveToFile);
}

Expand Down
79 changes: 24 additions & 55 deletions lib/src/models/data/messages.dart
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
import "package:protobuf/protobuf.dart";
import "dart:async";

import "package:rover_dashboard/data.dart";
import "package:rover_dashboard/models.dart";

/// A mixin that delegates [WrappedMessage]s to a handler via [registerHandler].
///
/// - Use [registerHandler] to invoke your handler whenever a new message is received.
/// - Use [removeHandler] to remove your handler.
/// - Override [allowedFallthrough] to allow certain massages to pass unhandled.
class MessagesModel {
/// A set of message types that are allowed to pass through without being handled.
static const Set<String> allowedFallthrough = {"AutonomyData", "Disconnect"};

/// A set of handlers to be called based on [WrappedMessage.name].
final Map<String, RawDataHandler> _handlers = {};

/// Delegates the message contents to the appropriate handler.
void onMessage(WrappedMessage wrapper) {
final rawHandler = _handlers[wrapper.name];
if (rawHandler == null) {
if (allowedFallthrough.contains(wrapper.name)) return;
throw StateError("No handler registered for ${wrapper.name} message");
}
try {
return rawHandler(wrapper.data);
} on InvalidProtocolBufferException {
// Data is corrupt, ignore it
}
}
export "package:burt_network/burt_network.dart" show WrappedMessageStream;

/// A single model to consolidate all messages.
///
/// Messages can arrive from serial devices or UDP devices, and any device can be unexpectedly
/// disconnected at any time. To simplify the logic of subscribing for new messages, this model
/// holds a [stream] of [WrappedMessage]s that anyone can subscribe to. When a message arrives,
/// simply call [addMessage] to ensure it will be added to the stream.
///
/// Note that having this model forward [stream] to the serial and UDP streams would *not* work,
/// as those streams can be closed when devices are disconnected, and new streams are created when
/// devices are connected for the first time. In that case, anyone who subscribes to the stream
/// before a device is connected (eg, in [Model.init]) won't get messages received afterwards. To
/// get around this issue, this model uses the same [StreamController] the entire time.
class MessagesModel extends Model {
/// Sends a command over the network or over Serial.
void sendMessage(Message message, {bool checkVersion = true}) {
final shouldCheck = checkVersion && models.settings.dashboard.versionChecking;
Expand All @@ -42,34 +31,14 @@ class MessagesModel {
models.sockets.data.sendMessage(message);
}

/// Adds a handler for the given message type.
///
/// When a new message is received, [onMessage] checks to see if its type matches [name].
/// If so, it calls [decoder] to parse the binary data into a Protobuf class, and then
/// calls [handler] with that parsed data class.
///
/// For example, with a message called `ScienceData`, you would use this function as:
/// ```dart
/// registerHandler<ScienceData>(
/// name: ScienceData().messageName, // identify the data as a ScienceData message
/// decoder: ScienceData.fromBuffer, // deserialize into a ScienceData instance
/// handler: (ScienceData data) => print(data.methane), // do something with the data
/// );
/// ```
void registerHandler<T extends Message>({
required String name,
required MessageDecoder<T> decoder,
required MessageHandler<T> handler,
}) {
if (_handlers.containsKey(name)) { // handler was already registered
throw ArgumentError("There's already a message handler for $name.");
} else {
_handlers[name] = (data) => handler(decoder(data));
}
}
final _controller = StreamController<WrappedMessage>.broadcast();

/// The stream of messages. Use [WrappedMessageStream.onMessage] to subscribe to messages.
Stream<WrappedMessage> get stream => _controller.stream;

/// Adds a message to the [stream].
void addMessage(WrappedMessage message) => _controller.add(message);

/// Removes the handler for a given message type.
///
/// This is useful if you register a handler to update a piece of UI that is no longer on-screen.
void removeHandler(String name) => _handlers.remove(name);
@override
Future<void> init() async { }
}
4 changes: 2 additions & 2 deletions lib/src/models/data/serial.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import "package:rover_dashboard/models.dart";
/// names instead.
///
/// Send messages to the connected devices using the [sendMessage] method, and all messages
/// received all ports are forwarded to [MessagesModel.onMessage].
/// received all ports are forwarded to [MessagesModel.addMessage].
class SerialModel extends Model {
/// All the connected devices and their respective serial ports.
///
Expand All @@ -37,7 +37,7 @@ class SerialModel extends Model {
models.home.setMessage(severity: Severity.error, text: "Could not connect to $port");
return;
}
device.messages.listen(models.messages.onMessage);
device.messages.listen(models.messages.addMessage);
models.home.setMessage(severity: Severity.info, text: "Connected to $port");
devices[port] = device;
notifyListeners();
Expand Down
2 changes: 1 addition & 1 deletion lib/src/models/data/sockets.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Sockets extends Model {
? onConnect(socket.device)
: onDisconnect(socket.device),
);
socket.messages.listen(models.messages.onMessage);
socket.messages.listen(models.messages.addMessage);
await socket.init();
}
final level = Logger.level;
Expand Down
32 changes: 16 additions & 16 deletions lib/src/models/data/video.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ class VideoModel extends Model {
};

/// How many frames came in the network in the past second.
///
///
/// This number is updated every frame. Use [networkFps] in the UI.
Map<CameraName, int> framesThisSecond = {
for (final name in CameraName.values)
for (final name in CameraName.values)
name: 0,
};

/// How many frames came in the network in the past second.
Map<CameraName, int> networkFps = {};

/// Triggers when it's time to update a new frame.
///
///
/// This is kept here to ensure all widgets are in sync.
Timer? frameUpdater;

Expand All @@ -40,15 +40,15 @@ class VideoModel extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<VideoData>(
models.messages.stream.onMessage<VideoData>(
name: VideoData().messageName,
decoder: VideoData.fromBuffer,
handler: handleData,
constructor: VideoData.fromBuffer,
callback: handleData,
);
models.messages.registerHandler<VideoCommand>(
models.messages.stream.onMessage<VideoCommand>(
name: VideoCommand().messageName,
decoder: VideoCommand.fromBuffer,
handler: (command) => _handshake = command,
constructor: VideoCommand.fromBuffer,
callback: (command) => _handshake = command,
);
fpsTimer = Timer.periodic(const Duration(seconds: 1), resetNetworkFps);
reset();
Expand All @@ -58,7 +58,7 @@ class VideoModel extends Model {
void resetNetworkFps([_]) {
networkFps = Map.from(framesThisSecond);
framesThisSecond = {
for (final name in CameraName.values)
for (final name in CameraName.values)
name: 0,
};
notifyListeners();
Expand Down Expand Up @@ -110,13 +110,13 @@ class VideoModel extends Model {
/// Takes a screenshot of the current frame.
Future<void> saveFrame(CameraName name) async {
final cachedFrame = feeds[name]?.frame;
if (cachedFrame == null) throw ArgumentError.notNull("Feed for $name");
if (cachedFrame == null) throw ArgumentError.notNull("Feed for $name");
await services.files.writeImage(cachedFrame, name.humanName);
models.home.setMessage(severity: Severity.info, text: "Screenshot saved");
}

/// Updates settings for the given camera.
Future<void> updateCamera(String id, CameraDetails details, {bool verify = true}) async {
Future<void> updateCamera(String id, CameraDetails details, {bool verify = true}) async {
_handshake = null;
final command = VideoCommand(id: id, details: details);
models.sockets.video.sendMessage(command);
Expand All @@ -126,7 +126,7 @@ class VideoModel extends Model {
}

/// Enables or disables the given camera.
///
///
/// This function is called automatically, so if the camera is not connected or otherwise available,
/// it'll fail silently. However, if the server simply doesn't respond, it'll show a warning.
Future<void> toggleCamera(CameraName name, {required bool enable}) async {
Expand All @@ -141,16 +141,16 @@ class VideoModel extends Model {
await Future<void>.delayed(const Duration(seconds: 2));
if (_handshake == null) {
models.home.setMessage(
severity: Severity.warning,
severity: Severity.warning,
text: "Could not ${enable ? 'enable' : 'disable'} the ${name.humanName} camera",
);
}
}
}
}

/// An exception thrown when the rover does not respond to a handshake.
///
/// Certain changes require a handshake to ensure the rover has received and applied the change.
/// If the rover fails to acknowledge or apply the change, a response will not be sent. Throw
/// this error to indicate that.
/// this error to indicate that.
class RequestNotAccepted implements Exception { }
31 changes: 16 additions & 15 deletions lib/src/models/rover/metrics.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,32 @@ class RoverMetrics extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<DriveData>(
models.messages.stream.onMessage(
name: DriveData().messageName,
decoder: DriveData.fromBuffer,
handler: drive.update,
constructor: DriveData.fromBuffer,
callback: drive.update,
);
models.messages.registerHandler<ScienceData>(
models.messages.stream.onMessage(
name: ScienceData().messageName,
decoder: ScienceData.fromBuffer,
handler: science.update,
constructor: ScienceData.fromBuffer,
callback: science.update,
);
models.messages.registerHandler<RoverPosition>(
models.messages.stream.onMessage(
name: RoverPosition().messageName,
decoder: RoverPosition.fromBuffer,
handler: position.update,
constructor: RoverPosition.fromBuffer,
callback: position.update,
);
models.messages.registerHandler<ArmData>(
models.messages.stream.onMessage(
name: ArmData().messageName,
decoder: ArmData.fromBuffer,
handler: arm.update,
constructor: ArmData.fromBuffer,
callback: arm.update,
);
models.messages.registerHandler<GripperData>(
models.messages.stream.onMessage(
name: GripperData().messageName,
decoder: GripperData.fromBuffer,
handler: gripper.update,
constructor: GripperData.fromBuffer,
callback: gripper.update,
);
drive.addListener(vitals.notify);
// versionTimer = Timer.periodic(versionInterval, _sendVersions);
}

Expand Down
18 changes: 9 additions & 9 deletions lib/src/models/rover/settings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import "package:rover_dashboard/models.dart";
const confirmationDelay = Duration(seconds: 1);

/// Updates sensitive settings on the rover.
///
/// Certain settings need confirmation that they were actually changed. Due to the nature of UDP,
///
/// Certain settings need confirmation that they were actually changed. Due to the nature of UDP,
/// we have no way to actually guarantee this, so we simply ask that the rover send the exact same
/// message in response (see [UpdateSetting]). If we do not get the response after waiting for a
/// confirmationDelay], we conclude that the rover didn't receive our request, similar to heartbeat.
Expand All @@ -22,15 +22,15 @@ class RoverSettings extends Model {

@override
Future<void> init() async {
models.messages.registerHandler<UpdateSetting>(
models.messages.stream.onMessage(
name: UpdateSetting().messageName,
decoder: UpdateSetting.fromBuffer,
handler: (settings) => _handshakes++,
constructor: UpdateSetting.fromBuffer,
callback: (settings) => _handshakes++,
);
}

/// Sends an [UpdateSetting] and awaits a response.
///
///
/// The response must be an echo of the data sent, to ensure the rover acknowledges the data.
/// Returns true if the handshake succeeds.
Future<bool> tryChangeSettings(UpdateSetting value) async {
Expand All @@ -45,15 +45,15 @@ class RoverSettings extends Model {
}

/// Sets the status of the rover.
///
///
/// See [RoverStatus] for details.
Future<void> setStatus(RoverStatus value) async {
if (!models.rover.isConnected) return;
if (value == RoverStatus.AUTONOMOUS || value == RoverStatus.IDLE) {
for (final controller in models.rover.controllers) {
controller.setMode(OperatingMode.none);
}
} else if (value == RoverStatus.MANUAL) {
} else if (value == RoverStatus.MANUAL) {
models.rover.setDefaultControls();
} else {
final message = UpdateSetting(status: value);
Expand All @@ -66,7 +66,7 @@ class RoverSettings extends Model {
settings.status = value;
models.rover.status.value = value;
notifyListeners();
}
}

/// Changes the color of the rover's LED strip.
Future<bool> setColor(ProtoColor color, {required bool blink}) async {
Expand Down
12 changes: 8 additions & 4 deletions lib/src/models/view/builders/autonomy_command.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "dart:async";

import "package:flutter/foundation.dart";
import "package:rover_dashboard/data.dart";
import "package:rover_dashboard/models.dart";
Expand All @@ -22,19 +24,21 @@ class AutonomyCommandBuilder extends ValueBuilder<AutonomyCommand> {
@override
List<ChangeNotifier> get otherBuilders => [models.rover.status];

StreamSubscription<AutonomyCommand>? _subscription;

/// Listens for incoming confirmations from the rover that it received the command.
Future<void> init() async {
await Future<void>.delayed(const Duration(seconds: 1));
models.messages.registerHandler<AutonomyCommand>(
_subscription = models.messages.stream.onMessage(
name: AutonomyCommand().messageName,
decoder: AutonomyCommand.fromBuffer,
handler: (data) => _handshake = data,
constructor: AutonomyCommand.fromBuffer,
callback: (data) => _handshake = data,
);
}

@override
void dispose() {
models.messages.removeHandler(AutonomyCommand().messageName);
_subscription?.cancel();
super.dispose();
}

Expand Down
Loading
Loading