Skip to content

Commit

Permalink
Wrapper registry (#118)
Browse files Browse the repository at this point in the history
This PR factors out the `registerHandler` code from the `DashboardSocket` and makes a new mixin, `WrapperRegistry` that can be used on _any_ class. This is to help make unifying UDP and Serial easier.
  • Loading branch information
Levi-Lesches authored Jul 20, 2023
1 parent fc26917 commit 7685704
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 109 deletions.
1 change: 1 addition & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ linter:
always_use_package_imports: false # not when importing sibling files
sort_constructors_first: false # final properties, then constructor
avoid_dynamic_calls: false # this lint takes over errors in the IDE
one_member_abstracts: false # abstract classes are good for interfaces

# Temporarily disabled until we are ready to document
# public_member_api_docs: false
69 changes: 44 additions & 25 deletions lib/src/models/data/sockets.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,43 @@ import "package:rover_dashboard/services.dart";
/// Coordinates all the sockets to point to the right [RoverType].
class Sockets extends Model {
/// A UDP socket for sending and receiving Protobuf data.
final data = DashboardSocket(device: Device.SUBSYSTEMS);
late final data = DashboardSocket(
device: Device.SUBSYSTEMS,
onConnect: onConnect,
onDisconnect: onDisconnect,
);

/// A UDP socket for receiving video.
final video = DashboardSocket(device: Device.VIDEO);
late final video = DashboardSocket(
device: Device.VIDEO,
onConnect: onConnect,
onDisconnect: onDisconnect,
);

/// A UDP socket for receiving video.
final video2 = DashboardSocket(device: Device.VIDEO);
late final video2 = DashboardSocket(
device: Device.VIDEO,
onConnect: onConnect,
onDisconnect: onDisconnect,
);

/// A UDP socket for controlling autonomy.
final autonomy = DashboardSocket(device: Device.AUTONOMY, allowedFallthrough: {AutonomyData().messageName});

/// A UDP socket for controlling rover position
final mars = DashboardSocket(device: Device.MARS_SERVER);
late final autonomy = DashboardSocket(
device: Device.AUTONOMY,
onConnect: onConnect,
onDisconnect: onDisconnect,
allowedFallthrough: {AutonomyData().messageName},
);

/// A UDP socket for controlling the MARS subsystem.
late final mars = DashboardSocket(
device: Device.MARS_SERVER,
onConnect: onConnect,
onDisconnect: onDisconnect,
);

/// A list of all the sockets this model manages.
late final List<DashboardSocket> sockets = [data, video, video2, autonomy, mars];
List<DashboardSocket> get sockets => [data, video, video2, autonomy, mars];

/// The rover-like system currently in use.
RoverType rover = RoverType.rover;
Expand All @@ -39,7 +60,6 @@ class Sockets extends Model {
Future<void> init() async {
for (final socket in sockets) {
await socket.init();
socket.event.addListener(() => onNewEvent(socket.device, socket.event.value));
}
await updateSockets();
}
Expand All @@ -52,32 +72,31 @@ class Sockets extends Model {
super.dispose();
}

/// Notifies the user when a device connects or disconnects.
void onNewEvent(Device device, HeartbeatEvent event) {
switch (event) {
case HeartbeatEvent.connected:
models.home.setMessage(severity: Severity.info, text: "The ${device.humanName} has connected");
if (device == Device.SUBSYSTEMS) models.rover.status.value = models.rover.settings.status;
case HeartbeatEvent.disconnected:
models.home.setMessage(severity: Severity.critical, text: "The ${device.humanName} has disconnected");
if (device == Device.SUBSYSTEMS) models.rover.status.value = RoverStatus.DISCONNECTED;
if (device == Device.VIDEO) models.video.reset();
if (device == Device.MARS_SERVER) models.rover.metrics.mars.clearStatus();
case HeartbeatEvent.none:
}
/// Notifies the user when a new device has connected.
void onConnect(Device device) {
models.home.setMessage(severity: Severity.info, text: "The ${device.humanName} has connected");
if (device == Device.SUBSYSTEMS) models.rover.status.value = models.rover.settings.status;
}

/// Notifies the user when a device has disconnected.
void onDisconnect(Device device) {
models.home.setMessage(severity: Severity.critical, text: "The ${device.humanName} has disconnected");
if (device == Device.SUBSYSTEMS) models.rover.status.value = RoverStatus.DISCONNECTED;
if (device == Device.VIDEO) models.video.reset();
if (device == Device.MARS_SERVER) models.rover.metrics.mars.clearStatus();
}

/// Set the right IP addresses for the rover or tank.
Future<void> updateSockets() async {
// Initialize sockets
// 1. Initialize sockets
final settings = models.settings.network;
data.destination = settings.subsystemsSocket.copy();
video.destination = settings.videoSocket.copy();
video2.destination = SocketInfo(address: InternetAddress("192.168.1.30"), port: 8007);
autonomy.destination = settings.autonomySocket.copy();
mars.destination = settings.marsSocket.copy();

// Change IP addresses
// 2. Change IP addresses
for (final socket in sockets) {
socket.destination!.address = switch (rover) {
RoverType.rover => socket.destination!.address,
Expand All @@ -86,7 +105,7 @@ class Sockets extends Model {
};
}

// Reset
// 3. Reset
await reset();
}

Expand Down
113 changes: 31 additions & 82 deletions lib/src/services/socket.dart
Original file line number Diff line number Diff line change
@@ -1,38 +1,43 @@
import "package:burt_network/burt_network.dart";
import "package:flutter/foundation.dart"; // <-- Used for ValueNotifier
import "package:protobuf/protobuf.dart";

import "package:rover_dashboard/data.dart";

import "service.dart";

/// A callback to execute with a specific serialized Protobuf message.
typedef MessageHandler<T extends Message> = void Function(T);

/// A callback to execute with raw Protobuf data.
typedef RawDataHandler = void Function(List<int> data);
import "wrapper_registry.dart";

/// A service to send and receive Protobuf messages over a UDP socket, using [ProtoSocket].
///
/// This class monitors its connection to the given [device] by sending heartbeats periodically and
/// logging the response (or lack thereof).
/// - Heartbeats are sent via [checkHeartbeats]
/// - The strength of the connection is exposed via [connectionStrength], which is also a [ValueNotifier].
/// - For a simple connection check, use [isConnected].
/// - Use the [event] [ValueNotifier] to listen for new or dropped connections.
///
/// To use this class:
/// - Call [init] to open the socket.
/// - Check [connectionStrength] for the connection to the given [device].
/// - To send a message, call [sendMessage].
/// - To be notified when a message is received, call [registerHandler].
class DashboardSocket extends ProtoSocket implements Service {
// ================== Final fields ==================
/// - To be notified when a message is received, call [registerHandler].
/// - To remove your handler, call [removeHandler].
/// - Call [dispose] to close the socket.
class DashboardSocket extends ProtoSocket with WrapperRegistry implements Service {
/// A list of message names that are allowed to pass without a handler.
@override
final Set<String> allowedFallthrough;
/// The handlers registered by [registerHandler].
final Map<String, RawDataHandler> _handlers = {};

/// A callback to run when the [device] has connected.
void Function(Device device) onConnect;
/// A callback to run when the [device] has disconnected.
void Function(Device device) onDisconnect;

/// Listens for incoming messages on a UDP socket and sends heartbeats to the [device].
DashboardSocket({required super.device, this.allowedFallthrough = const {}}) : super(port: 0);
DashboardSocket({required this.onConnect, required this.onDisconnect, required super.device, this.allowedFallthrough = const {}}) : super(port: 0);

// ================== Mutable fields ==================
/// The connection strength, as a percentage to this [device].
final connectionStrength = ValueNotifier<double>(0);

/// The latest [HeartbeatEvent] emitted by this socket.
final event = ValueNotifier(HeartbeatEvent.none);

/// The number of heartbeats received since the last heartbeat was sent.
int _heartbeats = 0;

Expand All @@ -42,89 +47,33 @@ class DashboardSocket extends ProtoSocket implements Service {
/// Whether this socket has a stable connection to the [device].
bool get isConnected => connectionStrength.value > 0;

// ================== Overriden methods ==================

@override
void onMessage(WrappedMessage wrapper) {
final rawHandler = _handlers[wrapper.name];
if (rawHandler == null) {
if (allowedFallthrough.contains(wrapper.name)) return;
throw StateError("No handler registered for ${wrapper.name} message on the $device socket");
}
try { return rawHandler(wrapper.data); }
on InvalidProtocolBufferException {
try { return rawHandler(wrapper.data); }
on InvalidProtocolBufferException { /* Nothing we can do */ }
}
}
void updateSettings(UpdateSetting settings) { }

@override
void updateSettings(UpdateSetting settings) { }
void onHeartbeat(Connect heartbeat, SocketInfo source) => _heartbeats++;

@override
Future<void> checkHeartbeats() async {
if (_isChecking) return;
// 1. Clear state and send a heartbeat
_isChecking = true;
_heartbeats = 0;
final wasConnected = isConnected;
sendMessage(Connect(sender: Device.DASHBOARD, receiver: device));
// 2. Wait a bit and count the number of responses
await Future<void>.delayed(heartbeatWaitDelay);
final wasConnected = isConnected;
if (_heartbeats > 0) {
if (!wasConnected) event.value = HeartbeatEvent.connected;
if (_heartbeats > 0) {
connectionStrength.value += connectionIncrement * _heartbeats;
} else {
if (wasConnected) event.value = HeartbeatEvent.disconnected;
connectionStrength.value -= connectionIncrement;
}
// 3. Assess the current state
connectionStrength.value = connectionStrength.value.clamp(0, 1);
if (isConnected && !wasConnected) onConnect(device);
if (wasConnected && !isConnected) onDisconnect(device);
_isChecking = false;
}

@override
void onHeartbeat(Connect heartbeat, SocketInfo source) => _heartbeats++;

// ================== Public methods ==================

/// Adds a handler for a given type.
///
/// When a new message is received, [onMessage] checks to see if its type matches [name].
/// If so, it calls [decoder] to parse the binary data into a Protobuf class, and then
/// calls [handler] with that parsed data class.
///
/// For example, with a message called `ScienceData`, you would use this function as:
/// ```dart
/// registerHandler<ScienceData>(
/// name: ScienceData().messageName, // identify the data as a ScienceData message
/// decoder: ScienceData.fromBuffer, // deserialize into a ScienceData instance
/// handler: (ScienceData data) => print(data.methane), // do something with the data
/// );
/// ```
void registerHandler<T extends Message>({
required String name,
required MessageDecoder<T> decoder,
required MessageHandler<T> handler,
}) {
if (_handlers.containsKey(name)) { // handler was already registered
throw ArgumentError("There's already a message handler for $name.");
} else {
_handlers[name] = (data) => handler(decoder(data));
}
}

/// Removes the handler for a given message type.
///
/// This is useful if you register a handler to update a piece of UI that is no longer on-screen.
void removeHandler(String name) => _handlers.remove(name);
}

/// An event representing a change in network connection status.
enum HeartbeatEvent {
/// The device just connected.
connected,
/// The device just disconnected.
disconnected,
/// Nothing just happened. Useful for an initial value.
none
}

/// How much each successful/missed handshake is worth, as a percent.
Expand Down
66 changes: 66 additions & 0 deletions lib/src/services/wrapper_registry.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import "package:protobuf/protobuf.dart";

import "package:rover_dashboard/data.dart";

/// A callback to execute with a specific serialized Protobuf message.
typedef MessageHandler<T extends Message> = void Function(T);

/// A callback to execute with raw Protobuf data.
typedef RawDataHandler = void Function(List<int> data);

/// A mixin that delegates [WrappedMessage]s to a handler via [registerHandler].
///
/// - Use [registerHandler] to invoke your handler whenever a new message is received.
/// - Use [removeHandler] to remove your handler.
/// - Override [allowedFallthrough] to allow certain massages to pass unhandled.
mixin WrapperRegistry {
final Map<String, RawDataHandler> _handlers = {};

/// A set of message types that are allowed to pass through without being handled.
Set<String> get allowedFallthrough;

/// Delegates the message contents to the appropriate handler.
void onMessage(WrappedMessage wrapper) {
final rawHandler = _handlers[wrapper.name];
if (rawHandler == null) {
if (allowedFallthrough.contains(wrapper.name)) return;
throw StateError("No handler registered for ${wrapper.name} message");
}
try { return rawHandler(wrapper.data); }
on InvalidProtocolBufferException {
try { return rawHandler(wrapper.data); }
on InvalidProtocolBufferException { /* Nothing we can do */ }
}
}

/// Adds a handler for the given message type.
///
/// When a new message is received, [onMessage] checks to see if its type matches [name].
/// If so, it calls [decoder] to parse the binary data into a Protobuf class, and then
/// calls [handler] with that parsed data class.
///
/// For example, with a message called `ScienceData`, you would use this function as:
/// ```dart
/// registerHandler<ScienceData>(
/// name: ScienceData().messageName, // identify the data as a ScienceData message
/// decoder: ScienceData.fromBuffer, // deserialize into a ScienceData instance
/// handler: (ScienceData data) => print(data.methane), // do something with the data
/// );
/// ```
void registerHandler<T extends Message>({
required String name,
required MessageDecoder<T> decoder,
required MessageHandler<T> handler,
}) {
if (_handlers.containsKey(name)) { // handler was already registered
throw ArgumentError("There's already a message handler for $name.");
} else {
_handlers[name] = (data) => handler(decoder(data));
}
}

/// Removes the handler for a given message type.
///
/// This is useful if you register a handler to update a piece of UI that is no longer on-screen.
void removeHandler(String name) => _handlers.remove(name);
}
2 changes: 0 additions & 2 deletions lib/src/widgets/atomic/video_feed.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,11 @@ class VideoFeedState extends State<VideoFeed> {
super.initState();
data = models.video.feeds[widget.name]!;
models.video.addListener(updateImage);
models.sockets.video.event.addListener(updateImage);
}

@override
void dispose() {
models.video.removeListener(updateImage);
models.sockets.video.event.removeListener(updateImage);
imageLoader.dispose();
super.dispose();
}
Expand Down

0 comments on commit 7685704

Please sign in to comment.