Skip to content

Commit

Permalink
[src/event_graph/python] replace smol::block_on with await PyO3/pyo3-…
Browse files Browse the repository at this point in the history
  • Loading branch information
zerin committed Feb 11, 2025
1 parent 448783a commit 209b9bf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 103 deletions.
71 changes: 0 additions & 71 deletions src/event_graph/python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,72 +1 @@
/target

# Byte-compiled / optimized / DLL files
__pycache__/
.pytest_cache/
*.py[cod]

# C extensions
*.so

# Distribution / packaging
.Python
.venv/
env/
bin/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
include/
man/
venv/
*.egg-info/
.installed.cfg
*.egg

# Installer logs
pip-log.txt
pip-delete-this-directory.txt
pip-selfcheck.json

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml

# Translations
*.mo

# Mr Developer
.mr.developer.cfg
.project
.pydevproject

# Rope
.ropeproject

# Django stuff:
*.log
*.pot

.DS_Store

# Sphinx documentation
docs/_build/

# PyCharm
.idea/

# VSCode
.vscode/

# Pyenv
.python-version
56 changes: 27 additions & 29 deletions src/event_graph/python/src/event_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use super::{p2p::P2pPtr, sled::SledDb};
use darkfi::{event_graph, event_graph::event, net};
use darkfi::{event_graph, event_graph::event, net, Error};
use pyo3::{
prelude::PyModule,
pyclass, pyfunction, pymethods,
Expand All @@ -37,6 +37,7 @@ pub struct EventGraph(pub event_graph::EventGraph);

#[pyfunction]
fn new_event_graph<'a>(
py: Python<'a>,
p2p: &P2pPtr,
sled_db: &SledDb,
datastore: PathBuf,
Expand All @@ -49,21 +50,23 @@ fn new_event_graph<'a>(
// because lifetime has no meaning in python which is
// a reference-counted language
let ex = Arc::new(Executor::new());
let dag_tree_name_bind = dag_tree_name.clone();
let p2p_ptr: net::P2pPtr = p2p.0.clone();
let sled_db_bind: sled::Db = sled_db.0.clone();
let event_graph = event_graph::EventGraph::new(
p2p_ptr,
sled_db_bind,
datastore,
replay_mode,
&*dag_tree_name_bind,
days_rotation,
ex.clone(),
);

let eg_res: Result<Arc<event_graph::EventGraph>, darkfi::Error> =
smol::future::block_on(ex.run(event_graph)); //event_graph.await;
let eg_res: Result<Arc<event_graph::EventGraph>, Error> =
pyo3_async_runtimes::async_std::run(py, async move {
let event_graph = event_graph::EventGraph::new(
p2p_ptr,
sled_db_bind,
datastore,
replay_mode,
&*dag_tree_name,
days_rotation,
ex.clone(),
);
let eg_res = ex.run(event_graph).await;
Ok(eg_res)
})
.unwrap();
let eg: Arc<event_graph::EventGraph> = eg_res.unwrap();
//note! pyclass implements IntoPy<PyObject> for EventGraphPtr
let eg_pyclass: EventGraphPtr = EventGraphPtr(eg);
Expand Down Expand Up @@ -113,7 +116,7 @@ async fn dag_insert_wait(
events_native: Vec<event::Event>,
) -> Vec<blake3::Hash> {
let fut = eg_ptr.dag_insert(&events_native[..]);
let ids_res: Result<Vec<blake3::Hash>, darkfi::Error> = fut.await;
let ids_res: Result<Vec<blake3::Hash>, Error> = fut.await;
let ids: Vec<blake3::Hash> = ids_res.unwrap();
async_std::task::sleep(std::time::Duration::from_secs(w8_time)).await;
ids
Expand All @@ -129,18 +132,16 @@ impl EventGraphPtr {
})
}

fn dag_insert<'a>(&'a self, events: Vec<Bound<Event>>) -> PyResult<Vec<Hash>> {
fn dag_insert<'a>(&'a self, py: Python<'a>, events: Vec<Bound<Event>>) -> PyResult<Vec<Hash>> {
let eg_ptr: event_graph::EventGraphPtr = self.0.clone();
let events_native: Vec<event::Event> =
events.iter().map(|i| i.borrow().deref().0.clone()).collect();
let ids = smol::future::block_on(eg_ptr.p2p.executor.run(dag_insert_wait(
5,
eg_ptr.clone(),
events_native,
)));
let ids_native: Vec<Hash> = ids.iter().map(|i| Hash(i.clone())).collect();
//async_std::task::sleep(std::time::Duration::from_secs(5)).await;
Ok(ids_native)
pyo3_async_runtimes::async_std::run(py, async move {
let ids =
eg_ptr.p2p.executor.run(dag_insert_wait(5, eg_ptr.clone(), events_native)).await;
let ids_native: Vec<Hash> = ids.iter().map(|i| Hash(i.clone())).collect();
Ok(ids_native)
})
}

fn dag_get<'a>(
Expand All @@ -151,8 +152,7 @@ impl EventGraphPtr {
let eg_ptr: event_graph::EventGraphPtr = self.0.clone();
let event_id: blake3::Hash = event_id_native.borrow().deref().0.clone();
pyo3_async_runtimes::async_std::future_into_py(py, async move {
let event_res: Result<Option<event::Event>, darkfi::Error> =
eg_ptr.dag_get(&event_id).await;
let event_res: Result<Option<event::Event>, Error> = eg_ptr.dag_get(&event_id).await;
let event: event::Event = event_res
.unwrap()
.expect(&format!("expecting event in return with id: {}", event_id).to_string());
Expand All @@ -161,11 +161,9 @@ impl EventGraphPtr {
})
}

fn dag_len(&self, _py: Python) -> usize {
fn dag_len(&self) -> usize {
let eg_ptr: event_graph::EventGraphPtr = self.0.clone();
eg_ptr.dag_len()
//TODO impl
//0
}

fn order_events<'a>(&'a self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
Expand Down
6 changes: 3 additions & 3 deletions src/event_graph/python/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ fn start_p2p<'a>(
let ex = p2p_ptr.clone().executor.clone();
let start_p2p_fut = start_p2p_and_wait(w8_time, p2p_ptr.clone());
pyo3_async_runtimes::async_std::future_into_py(py, async move {
smol::future::block_on(ex.run(start_p2p_fut));
ex.run(start_p2p_fut).await;
Ok(())
})
}
Expand Down Expand Up @@ -230,7 +230,7 @@ fn stop_p2p<'a>(
let p2p_ptr: net::P2pPtr = net_p2p_ptr.0.clone();
let ex = p2p_ptr.executor.clone();
pyo3_async_runtimes::async_std::future_into_py(py, async move {
smol::future::block_on(ex.run(stop_p2p_and_wait(w8_time, p2p_ptr.clone())));
ex.run(stop_p2p_and_wait(w8_time, p2p_ptr.clone())).await;
Ok(())
})
}
Expand All @@ -251,7 +251,7 @@ fn broadcast_p2p<'a>(
let ex = p2p_ptr.executor.clone();
let event: event_graph::Event = event_py.borrow().deref().0.clone();
pyo3_async_runtimes::async_std::future_into_py(py, async move {
smol::future::block_on(ex.run(broadcast_and_wait(w8_time, p2p_ptr.clone(), event.clone())));
ex.run(broadcast_and_wait(w8_time, p2p_ptr.clone(), event.clone())).await;
Ok(())
})
}
Expand Down

0 comments on commit 209b9bf

Please sign in to comment.