Skip to content

Commit

Permalink
Reduce per-connectoin ram use for picow
Browse files Browse the repository at this point in the history
NUM_LISTENERS=4 now works, though still seems to suffer from problems
when the arena is reduced to 96kB, unclear how
  • Loading branch information
mkj committed Jun 17, 2024
1 parent 53884ed commit e3d8d73
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 41 deletions.
6 changes: 3 additions & 3 deletions embassy/demos/common/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ pub async fn listener<D: Driver, S: DemoServer>(stack: &'static Stack<D>,
// Should TX and RX be symmetrical? Or larger TX might fill ethernet
// frames more efficiently, RX doesn't matter so much?
// How does this interact with the channel copy buffer sizes?
let mut rx_buffer = [0; 1550];
let mut tx_buffer = [0; 1550];
let mut rx_tcp = [0; 1550];
let mut tx_tcp = [0; 1550];

let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
let mut socket = TcpSocket::new(stack, &mut rx_tcp, &mut tx_tcp);
// socket.set_nagle_enabled(false);
loop {

Expand Down
2 changes: 1 addition & 1 deletion embassy/demos/common/src/takepipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use embassy_futures::select::{select, Either};

use sunset_embassy::{SunsetRawMutex, SunsetMutex};

pub const READ_SIZE: usize = 4000;
pub const READ_SIZE: usize = 200;
pub const WRITE_SIZE: usize = 64;

// TODO: this is fairly ugly, the mutex and counter could perhaps be abstracted
Expand Down
7 changes: 3 additions & 4 deletions embassy/demos/picow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ embassy-net-wiznet = { version = "0.1.0", optional = true }

embassy-executor = { version = "0.5", features = [
"integrated-timers", "executor-thread", "arch-cortex-m", "log",
# Larger than around 120kB causes unknown failures. using "nightly" feature
# instead works OK. Needs investigation. 96kB is sufficient for NUM_LISTENERS=2
# https://github.com/embassy-rs/embassy/issues/3061
"task-arena-size-98304"] }
# This is sufficient for NUM_LISTENERS=4. It seems like it should fit in 96kB,
# but has failures.
"task-arena-size-131072"] }
embassy-time = { version = "0.3", features = [] }
embassy-rp = { version = "0.1", features = ["unstable-pac", "time-driver"] }
embassy-net = { version = "0.4", features = ["tcp", "dhcpv4", "medium-ethernet", "log"] }
Expand Down
35 changes: 23 additions & 12 deletions embassy/demos/picow/src/flashconfig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,21 @@ use demo_common::SSHConfig;
const CONFIG_OFFSET: u32 = 0x150000;
pub const FLASH_SIZE: usize = 2*1024*1024;

pub(crate) type Fl<'a> = Flash<'a, FLASH, Async, FLASH_SIZE>;
pub(crate) struct Fl<'a> {
flash: Flash<'a, FLASH, Async, FLASH_SIZE>,
// Only a single task can write to flash at a time,
// keeping a buffer here saves duplicated buffer space in each task.
buf: [u8; FlashConfig::BUF_SIZE],
}

impl<'a> Fl<'a> {
pub fn new(flash: Flash<'a, FLASH, Async, FLASH_SIZE>) -> Self {
Self {
flash,
buf: [0u8; FlashConfig::BUF_SIZE],
}
}
}

// SSHConfig::CURRENT_VERSION must be bumped if any of this struct changes
#[derive(SSHEncode, SSHDecode)]
Expand Down Expand Up @@ -69,10 +83,8 @@ pub async fn create(flash: &mut Fl<'_>) -> Result<SSHConfig> {
Ok(c)
}

pub async fn load(flash: &mut Fl<'_>) -> Result<SSHConfig> {
// let mut buf = [0u8; ERASE_SIZE];
let mut buf = [0u8; FlashConfig::BUF_SIZE];
flash.read(CONFIG_OFFSET, &mut buf).await.map_err(|e| {
pub async fn load(fl: &mut Fl<'_>) -> Result<SSHConfig> {
fl.flash.read(CONFIG_OFFSET, &mut fl.buf).await.map_err(|e| {
debug!("flash read error 0x{CONFIG_OFFSET:x} {e:?}");
Error::msg("flash error")
})?;
Expand All @@ -83,7 +95,7 @@ pub async fn load(flash: &mut Fl<'_>) -> Result<SSHConfig> {
// writeln!(b, "load {:?}", buf.hex_dump());
// info!("{}", &b.s);

let s: FlashConfig = sshwire::read_ssh(&buf, None)?;
let s: FlashConfig = sshwire::read_ssh(&fl.buf, None)?;

if s.version != SSHConfig::CURRENT_VERSION {
return Err(Error::msg("wrong config version"))
Expand All @@ -102,15 +114,14 @@ pub async fn load(flash: &mut Fl<'_>) -> Result<SSHConfig> {
}
}

pub async fn save(flash: &mut Fl<'_>, config: &SSHConfig) -> Result<()> {
let mut buf = [0u8; ERASE_SIZE];
pub async fn save(fl: &mut Fl<'_>, config: &SSHConfig) -> Result<()> {
let sc = FlashConfig {
version: SSHConfig::CURRENT_VERSION,
config: OwnOrBorrow::Borrow(&config),
hash: config_hash(&config)?,
};
let l = sshwire::write_ssh(&mut buf, &sc)?;
let buf = &buf[..l];
let l = sshwire::write_ssh(&mut fl.buf, &sc)?;
let buf = &fl.buf[..l];

// use pretty_hex::PrettyHex;
// use core::fmt::Write;
Expand All @@ -119,12 +130,12 @@ pub async fn save(flash: &mut Fl<'_>, config: &SSHConfig) -> Result<()> {
// info!("{}", &b.s);

trace!("flash erase");
flash.erase(CONFIG_OFFSET, CONFIG_OFFSET + ERASE_SIZE as u32)
fl.flash.erase(CONFIG_OFFSET, CONFIG_OFFSET + ERASE_SIZE as u32)
.await
.map_err(|_| Error::msg("flash erase error"))?;

trace!("flash write");
flash.write(CONFIG_OFFSET, &buf).await
fl.flash.write(CONFIG_OFFSET, &buf).await
.map_err(|_| Error::msg("flash write error"))?;

info!("flash save done");
Expand Down
18 changes: 10 additions & 8 deletions embassy/demos/picow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use embassy_executor::Spawner;
use embassy_futures::select::{select, Either};
use embassy_net::{Stack, HardwareAddress, EthernetAddress};
use embedded_io_async::{Write, Read};
use embassy_rp::flash::Flash;

use heapless::{String, Vec};

Expand All @@ -21,7 +22,7 @@ use embassy_sync::mutex::Mutex;
use embassy_sync::blocking_mutex::raw::NoopRawMutex;

use sunset::*;
use sunset_embassy::{SSHServer, SunsetMutex, SunsetRawMutex, ProgressHolder};
use sunset_embassy::{SSHServer, SunsetMutex, ProgressHolder};
use sunset_demo_embassy_common as demo_common;
use demo_common::{SSHConfig, DemoServer, takepipe, ServerApp};
use takepipe::TakePipe;
Expand All @@ -42,7 +43,7 @@ compile_error!("No network device selected. Use cyw43 or w5500 feature");
#[cfg(all(feature = "cyw43", feature = "w5500"))]
compile_error!("Select only one of cyw43 or w5500");

const NUM_LISTENERS: usize = 2;
const NUM_LISTENERS: usize = 4;
// +1 for dhcp. referenced directly by wifi_stack() function
pub(crate) const NUM_SOCKETS: usize = NUM_LISTENERS + 1;

Expand All @@ -55,6 +56,7 @@ async fn main(spawner: Spawner) {
rtt_target::rtt_init_print!(NoBlockTrim, 4000);
// rtt_target::rtt_init_print!(BlockIfFull);
unsafe {
// thumbv6 doesn't have atomics normally needed by log
log::set_logger_racy(&LOGGER).unwrap();
log::set_max_level_racy(LOG_LEVEL);
}
Expand All @@ -69,7 +71,7 @@ async fn main(spawner: Spawner) {
getrandom::register_custom_getrandom!(caprand::getrandom);

// Configuration loaded from flash
let mut flash = flashconfig::Fl::new(p.FLASH, p.DMA_CH2);
let mut flash = flashconfig::Fl::new(Flash::new(p.FLASH, p.DMA_CH2));

let config = if option_env!("RESET_CONFIG").is_some() {
flashconfig::create(&mut flash).await.unwrap()
Expand All @@ -85,15 +87,15 @@ async fn main(spawner: Spawner) {
static USBS: StaticCell<takepipe::TakePipeStorage> = StaticCell::new();
static USBP: StaticCell<takepipe::TakePipe> = StaticCell::new();
let usb_pipe = {
let p = USBS.init(Default::default());
let p = USBS.init_with(Default::default);
USBP.init_with(|| p.build())
};

// A shared pipe to a local uart
static SERS: StaticCell<takepipe::TakePipeStorage> = StaticCell::new();
static SERP: StaticCell<takepipe::TakePipe> = StaticCell::new();
let serial1_pipe = {
let p = SERS.init(Default::default());
let p = SERS.init_with(Default::default);
SERP.init_with(|| p.build())
};

Expand All @@ -120,6 +122,7 @@ async fn main(spawner: Spawner) {
let HardwareAddress::Ethernet(EthernetAddress(eth)) = stack.hardware_address();
*state.net_mac.lock().await = eth;
for _ in 0..NUM_LISTENERS {
debug!("spawn listen");
spawner.spawn(cyw43_listener(&stack, config, state)).unwrap();
}
}
Expand Down Expand Up @@ -256,12 +259,11 @@ where
let r = async {
// TODO: could have a single buffer to translate in-place.
const DOUBLE: usize = 2 * takepipe::READ_SIZE;
let mut b = [0u8; takepipe::READ_SIZE];
let mut btrans = Vec::<u8, DOUBLE>::new();
loop {
let mut b = [0u8; takepipe::READ_SIZE];
let n = rx.read(&mut b).await?;
let b = &mut b[..n];
btrans.clear();
let mut btrans = Vec::<u8, DOUBLE>::new();
for c in b {
if *c == b'\n' {
// OK unwrap: btrans.len() = 2*b.len()
Expand Down
2 changes: 1 addition & 1 deletion embassy/demos/std/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl DemoServer for StdDemo {

async fn run(&self, serv: &SSHServer<'_>, mut common: ServerApp) -> Result<()>
{
let chan_pipe = Channel::<NoopRawMutex, ChanHandle, 1>::new();
let chan_pipe = Channel::<SunsetRawMutex, ChanHandle, 1>::new();

let prog_loop = async {
loop {
Expand Down
3 changes: 0 additions & 3 deletions embassy/src/embassy_channel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
//! Presents SSH channels as async
//!
//! Most code in this module is a convenience wrapper for methods in
//! [embassy_sunset].
#[allow(unused_imports)]
use log::{debug, error, info, log, trace, warn};

Expand Down
18 changes: 9 additions & 9 deletions embassy/src/embassy_sunset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ impl<'a> EmbassySunset<'a> {
let rx_stop = Signal::<SunsetRawMutex, ()>::new();

let tx = async {
let mut buf = [0; 1024];
loop {
// TODO: make sunset read directly from socket, no intermediate buffer?
// Perhaps not possible async, might deadlock.
let mut buf = [0; 1024];
let l = self.output(&mut buf).await?;
if wsock.write_all(&buf[..l]).await.is_err() {
info!("socket write error");
Expand All @@ -153,11 +153,12 @@ impl<'a> EmbassySunset<'a> {
};
let tx = select(tx, tx_stop.wait());

// rxbuf outside the async block avoids an extraneous copy somehow
let mut rxbuf = [0; 1024];
let rx = async {
let mut buf = [0; 1024];
loop {
// TODO: make sunset read directly from socket, no intermediate buffer.
let l = match rsock.read(&mut buf).await {
let l = match rsock.read(&mut rxbuf).await {
Ok(0) => {
debug!("net EOF");
self.with_runner(|r| r.close_input()).await;
Expand All @@ -172,20 +173,19 @@ impl<'a> EmbassySunset<'a> {
break Err(Error::ChannelEOF)
}
};
let mut buf = &buf[..l];
while !buf.is_empty() {
let n = self.input(buf).await?;
let mut rxbuf = &rxbuf[..l];
while !rxbuf.is_empty() {
let n = self.input(rxbuf).await?;
self.wake_progress();
buf = &buf[n..];
rxbuf = &rxbuf[n..];
}
}
.inspect(|r| warn!("rx complete {r:?}"))
};

// TODO: if RX fails (bad decrypt etc) it doesn't cancel prog, so gets stuck
let rx = select(rx, rx_stop.wait());
let rx = async {
let r = rx.await;
let r = select(rx, rx_stop.wait()).await;
tx_stop.signal(());
r
};
Expand Down

0 comments on commit e3d8d73

Please sign in to comment.