Skip to content

Commit

Permalink
Asynchronous robot liability content provider #33
Browse files Browse the repository at this point in the history
  • Loading branch information
strdn committed Nov 14, 2019
1 parent 4e486ec commit 9c98ff6
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 67 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: false
language: rust
cache: cargo
rust:
- nightly
- nightly-2019-07-20

matrix:
include:
Expand All @@ -11,7 +11,7 @@ matrix:
- os: windows

install:
- rustup target add wasm32-unknown-unknown --toolchain nightly
- rustup target add wasm32-unknown-unknown --toolchain nightly-2019-07-20
- cargo install --git https://github.com/alexcrichton/wasm-gc --force

script:
Expand Down
215 changes: 203 additions & 12 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ serde_json = "1.0"
hex-literal = "0.2"
exit-future = "0.1"
parking_lot = "0.4"
ipfs-api = "0.5"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.17", features = ["compat"] }
futures03_util = { package = "futures-util-preview", version = "0.3.0-alpha.17", features = ["compat"] }
ctrlc = { version = "3.0", features = ["termination"] }
Expand Down
5 changes: 4 additions & 1 deletion node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use primitives::{Pair, sr25519};
use futures::prelude::*;
use std::sync::Arc;
use log::info;
use ipfs_api::IpfsClient;

use futures03::channel::mpsc;
use futures03_util::stream::StreamExt;
Expand Down Expand Up @@ -126,7 +127,9 @@ construct_service_factory! {
);
service.spawn_task(Box::new(api.unit_error().boxed().compat()));

let (fut, liability_engine_services, liability_engine_subscribers) = ros_robonomics::start_liability_engine().unwrap();
let ipfs_client = Arc::new(IpfsClient::default());

let (fut, liability_engine_services, liability_engine_subscribers) = ros_robonomics::start_liability_engine(ipfs_client).unwrap();
service.spawn_task(Box::new(fut.unit_error().boxed().compat()));

let system_info = ros_rpc::system::SystemInfo {
Expand Down
2 changes: 1 addition & 1 deletion substrate-ros/robonomics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ log = "0.4"
bs58 = "0.2"
rosbag = "0.2"
rosrust = "0.8"
ipfsapi = "0.3.0"
ipfs-api = "0.5"
futures-timer = "0.2"
futures01 = { package = "futures", version = "0.1" }
futures-preview = { version = "0.3.0-alpha.17", features = ["compat", "async-await", "nightly"] }
Expand Down
130 changes: 84 additions & 46 deletions substrate-ros/robonomics/src/liability_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
//! This module exports Robonomics API into ROS namespace.
use std::{
io::Write,
fs::File,
sync::{Arc,
Mutex},
collections::HashMap,
};
use ipfsapi::IpfsApi;
use ipfs_api::IpfsClient;
use futures::{
future::Either,
prelude::*,
io::AllowStdIo,
compat::Stream01CompatExt,
compat::Future01CompatExt
};
use msgs::{
substrate_ros_msgs::{Liability,
Expand All @@ -37,6 +37,12 @@ use msgs::{
use rosrust::api::error::Error;
use crate::rosbag_player::build_players;

use futures01::Stream;

use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::channel::oneshot::Receiver;

/// ROS Pub/Sub queue size.
/// http://wiki.ros.org/roscpp/Overview/Publishers%20and%20Subscribers#Queueing_and_Lazy_Deserialization
const QUEUE_SIZE: usize = 10;
Expand All @@ -45,73 +51,105 @@ const LIABILITY_PREPARE_FOR_EXECUTION_TOPIC_NAME: &str = "/liability/prepare";
const LIABILITY_READY_TOPIC_NAME: &str = "liability/ready";
const LIABILITY_START_SRV_NAME: &str = "/liability/start";

use futures::channel::mpsc;

use futures::channel::oneshot;
use futures::channel::oneshot::Receiver;

fn add_liability_stream(
stream: mpsc::UnboundedReceiver<(Liability, Receiver<()>)>,
// we must download liability objective file from IPFS
// and after downloading we can build rosbag player for liability objective
fn create_liability_player_stream(
stream: mpsc::UnboundedReceiver<(Liability, Receiver<()>)>,
ipfs_client: Arc<IpfsClient>
) -> impl Future<Output=()> {

let liability_ready_pub = rosrust::publish(LIABILITY_READY_TOPIC_NAME, QUEUE_SIZE).unwrap();
// rostopic for publishing info about ready-to-start liabilities
let mut liability_ready_pub = Arc::new(rosrust::publish(LIABILITY_READY_TOPIC_NAME, QUEUE_SIZE).unwrap());

stream.for_each_concurrent( 10, move |(liability, l_lock)| {
let l = liability.clone();
let bag_hash = liability.order.objective;
let liability_id = liability.id;
let bag_hash = liability.order.objective.clone();
let mut status_sender = liability_ready_pub.clone();

log::debug!("Received liability {:?}", l);
let rbplayer = build_players(bag_hash.clone().as_str()).unwrap();
log::debug!("Construct player for {:?}", bag_hash);
liability_ready_pub.send(l.clone()).unwrap();
l_lock.then(|_| rbplayer)
download_liability_objective(ipfs_client.clone(), l.clone())
.then(move |_| {
let rbplayer = build_players(bag_hash.clone().as_str());
match rbplayer {
Ok(player) => {
log::info!("Construct player for {:?}", bag_hash);
status_sender.send(l.clone());
Either::Left(l_lock.then(|_| player))
},
Err(e) => {
log::error!("Failed to construct player for liability {} with error {}", l.id, e);
Either::Right(future::ready(()))
}
}
})
})
}

pub fn start_liability_engine()
-> Result<(impl Future<Output=()> + 'static, Vec<rosrust::Service>, Vec<rosrust::Subscriber>), Error> {
let api = IpfsApi::new("127.0.0.1", 5001);
// download liability objective Future
// `liability` message and oneshot start receiver will passed for preparing rosbag player future after objective downloading
// otherwise (failed downloading) rosbag player will not be build
async fn download_liability_objective(
ipfs_client: Arc<IpfsClient>,
liability: Liability,
) {
let bag_hash = liability.order.objective.clone();
let response = ipfs_client.cat(bag_hash.as_str()).concat2().compat().into_future().await;
match response {
Ok(bytes) => {
let bag_file = File::create(bag_hash.as_str()).expect("could not create file");
let mut buffer = AllowStdIo::new(bag_file);
buffer.write_all(&bytes).await;
buffer.close().await;
},
Err(e) => {
log::error!("IPFS: Failed to download file {:?}", e);
}
}
}

pub fn start_liability_engine(
ipfs_client: Arc<IpfsClient>,
) -> Result<(impl Future<Output=()>, Vec<rosrust::Service>, Vec<rosrust::Subscriber>), Error> {
let mut services = vec![];
let mut subscribers = vec![];

let (liability_tx, liability_rx) = mpsc::unbounded::<(Liability, Receiver<()>)>();
let add_liabilities_stream = add_liability_stream(liability_rx);
let (construct_player_tx, construct_player_rx) = mpsc::unbounded::<(Liability, Receiver<()>)>();
let liability_player_stream = create_liability_player_stream( construct_player_rx, ipfs_client);

let locks_hash_map00 = Arc::new(Mutex::new(HashMap::new()));
let locks01 = Arc::clone(&locks_hash_map00);
let locks02 = Arc::clone(&locks_hash_map00);
// hashmap for store liability id -> oneshot start sender
// rosbag player must be ready for start, but not play messages until /liability/start service will be called
let locks_map00 = Arc::new(Mutex::new(HashMap::new()));
let locks_map01 = Arc::clone(&locks_map00);

// listen rostopic and initiate rosbag player preparation
subscribers.push(
rosrust::subscribe(LIABILITY_PREPARE_FOR_EXECUTION_TOPIC_NAME, QUEUE_SIZE, move |l: Liability| {
let bag_hash = l.order.objective.clone();
let bytes = api.cat(bag_hash.as_str());
match bytes {
Ok(reads) => {
let data = reads.collect::<Vec<_>>();
let mut bag_file = File::create(bag_hash.as_str()).expect(format!("could not create file {}", bag_hash).as_str());
bag_file.write_all(&data);

let (locks_tx, locks_rx) = oneshot::channel();
let mut lhm = locks01.lock().unwrap();
if ! lhm.contains_key(&l.id) {
liability_tx.unbounded_send((l.clone(), locks_rx)).unwrap();
lhm.insert(l.id, locks_tx);
}
},
Err(e) => log::error!("IPFS: Failed to download file {:?}", e)
let (locks_tx, locks_rx) = oneshot::channel();
let mut lhm = locks_map00.lock().unwrap();
if ! lhm.contains_key(&l.id) {
construct_player_tx.unbounded_send((l.clone(), locks_rx)).unwrap();
lhm.insert(l.id, locks_tx);
}
}).expect("failed to create incoming liability subscriber")
);

services.push(rosrust::service::<StartLiabilityPlayer, _>(LIABILITY_START_SRV_NAME, move |req| {
let mut res = StartLiabilityPlayerRes::default();
let mut lhm = locks02.lock().unwrap();
let lock_sender = lhm.remove(&req.id).unwrap();
lock_sender.send(());
res.success = true;
let mut lhm = locks_map01.lock().unwrap();
// get start sender and send start signal
match lhm.remove(&req.id) {
Some(sender) => {
sender.send(());
res.success = true;
res.msg = "Start signal sent successfully".to_string();
}
None => {
res.success = false;
res.msg = "Unable to find ready to run liability player".to_string();
}
}
Ok(res)
})?);

Ok((add_liabilities_stream.map(|_| ()), services, subscribers))
Ok((liability_player_stream.map(|_| ()), services, subscribers))
}
9 changes: 4 additions & 5 deletions substrate-ros/robonomics/src/rosbag_player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

use rosrust::api::raii::Publisher;
use std::collections::HashMap;
use rosbag::{RosBag, Record, RecordsIterator};
use rosbag::{RosBag, Record};
use futures_timer::Delay;
use std::time;
use msgs::std_msgs;

use futures::{prelude::*, io::AllowStdIo, compat::Stream01CompatExt, Poll};
use rosbag::record_types::{MessageData, Connection};
use futures::prelude::*;
use rosbag::record_types::Connection;
use futures::io::Error;
use std::sync::Arc;

Expand All @@ -39,8 +39,7 @@ players_builder!(

pub fn build_players(path: &str) -> Result<impl Future<Output=()>, Error> where
{
let bag = Arc::new(RosBag::new(path).unwrap());
return Ok(players_builder(bag))
RosBag::new(path).map(|rosbag| players_builder(Arc::new(rosbag)))
}

struct RosbagPlayer<T> where
Expand Down

0 comments on commit 9c98ff6

Please sign in to comment.