Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscorn committed Dec 18, 2023
1 parent 6927e54 commit 5b0e325
Show file tree
Hide file tree
Showing 18 changed files with 405 additions and 259 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
### Generated by gibo (https://github.com/simonwhitaker/gibo)
### https://raw.github.com/github/gitignore/e5323759e387ba347a9d50f8b0ddd16502eb71d4/Rust.gitignore

# Prevent not to commit example outputs
output.*

# Generated by Cargo
# will have compiled files and executables
debug/
Expand Down
2 changes: 2 additions & 0 deletions nusamai-plateau/citygml/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum ParseError {
SchemaViolation(String),
#[error("Invalid value: {0}")]
InvalidValue(String),
#[error("cancelled")]
Cancelled,
}

pub struct CityGMLReader {
Expand Down
3 changes: 2 additions & 1 deletion nusamai-plateau/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod models;
use serde::{Deserialize, Serialize};

#[derive(Debug)]
#[derive(Debug, Deserialize, Serialize)]
pub struct TopLevelCityObject {
pub root: citygml::object::ObjectValue,
pub geometries: citygml::geometry::Geometries,
Expand Down
3 changes: 3 additions & 0 deletions nusamai/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ citygml = { path = "../nusamai-plateau/citygml" }
quick-xml = "0.31.0"
clap = { version = "4.4.11", features = ["derive"] }
thiserror = "1.0.51"
ctrlc = "3.4.1"
bincode = "1.3.3"
lz4_flex = "0.11.1"

[dev-dependencies]
rand = "0.8.5"
File renamed without changes.
3 changes: 3 additions & 0 deletions nusamai/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod configuration;
pub mod pipeline;
pub mod sink;
pub mod source;
pub mod transform;
205 changes: 48 additions & 157 deletions nusamai/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,185 +1,73 @@
use std::io::BufRead;
use std::sync::{Arc, Mutex};

use clap::Parser;
use rayon::{prelude::*, ThreadPoolBuilder};

use citygml::{CityGMLElement, CityGMLReader, ParseError, SubTreeReader};
use nusamai::configuration::Config;
use nusamai::pipeline::{self, TransformError};
use nusamai::pipeline::{
feedback, Feedback, Percel, Sender, Sink, SinkInfo, SinkProvider, Source, SourceInfo,
SourceProvider, Transformer,
};
use nusamai_plateau::TopLevelCityObject;
use nusamai::pipeline::Canceller;
use nusamai::sink::{noop::NoopSinkProvider, serde::SerdeSinkProvider};
use nusamai::sink::{DataSink, DataSinkProvider};
use nusamai::source::citygml::CityGMLSourceProvider;
use nusamai::source::{DataSource, DataSourceProvider};
use nusamai::transform::NoopTransformer;

pub struct CityGMLSourceProvider {
// FIXME: Use the configuration mechanism
filenames: Vec<String>,
}

impl SourceProvider for CityGMLSourceProvider {
fn create(&self, _config: &Config) -> Box<dyn Source> {
Box::new(CityGMLSource {
filenames: self.filenames.clone(),
})
}

fn info(&self) -> SourceInfo {
SourceInfo {
name: "Dummy Source".to_string(),
}
}

fn config(&self) -> Config {
Config::default()
}
}
#[derive(clap::Parser)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(value_enum, long)]
sink: SinkChoice,

pub struct CityGMLSource {
#[arg()]
filenames: Vec<String>,
}

impl Source for CityGMLSource {
fn run(&mut self, sink: Sender, feedback: &Feedback) {
let pool = ThreadPoolBuilder::new().build().unwrap();
pool.install(|| {
self.filenames.par_iter().for_each(|filename| {
println!("loading city objects from: {} ...", filename);

let Ok(file) = std::fs::File::open(filename) else {
panic!("failed to open file {}", filename);
};
let reader = std::io::BufReader::new(file);
let mut xml_reader = quick_xml::NsReader::from_reader(reader);
match CityGMLReader::new().start_root(&mut xml_reader) {
Ok(mut st) => match toplevel_dispatcher(&mut st, &sink, feedback) {
Ok(size) => size,
Err(e) => panic!("Err: {:?}", e),
},
Err(e) => panic!("Err: {:?}", e),
};
});
})
}
#[derive(clap::ValueEnum, Clone)]
enum SinkChoice {
Noop,
Serde,
}

fn toplevel_dispatcher<R: BufRead>(
st: &mut SubTreeReader<R>,
sink: &Sender,
feedback: &Feedback,
) -> Result<(), ParseError> {
match st.parse_children(|st| {
if feedback.is_cancelled() {
return Ok(());
impl SinkChoice {
fn create(&self) -> Box<dyn DataSinkProvider> {
match self {
SinkChoice::Noop => Box::new(NoopSinkProvider {}),
SinkChoice::Serde => Box::new(SerdeSinkProvider {}),
}

match st.current_path() {
b"core:cityObjectMember" => {
let mut cityobj: nusamai_plateau::models::CityObject = Default::default();
cityobj.parse(st)?;
let geometries = st.collect_geometries();

if let Some(root) = cityobj.into_object() {
let cityobj = TopLevelCityObject { root, geometries };
if sink.send(Percel { cityobj }).is_err() {
return Ok(());
}
}

Ok(())
}
b"gml:boundedBy" | b"app:appearanceMember" => {
st.skip_current_element()?;
Ok(())
}
other => Err(ParseError::SchemaViolation(format!(
"Unrecognized element {}",
String::from_utf8_lossy(other)
))),
}
}) {
Ok(_) => Ok(()),
Err(e) => {
println!("Err: {:?}", e);
Err(e)
}
}
}

struct NoopTransformer {}

impl Transformer for NoopTransformer {
fn transform(
&self,
percel: Percel,
sender: &Sender,
_feedback: &feedback::Feedback,
) -> Result<(), TransformError> {
// no-op
sender.send(percel)?;
Ok(())
}
}

struct DummySinkProvider {}
fn main() {
let args = Args::parse();

impl SinkProvider for DummySinkProvider {
fn create(&self, _config: &Config) -> Box<dyn Sink> {
Box::new(DummySink {
num_features: 0,
num_vertices: 0,
let mut canceller = Arc::new(Mutex::new(Canceller::default()));
{
let canceller = canceller.clone();
ctrlc::set_handler(move || {
println!("request cancellation");
canceller.lock().unwrap().cancel();
})
.expect("Error setting Ctrl-C handler");
}

fn info(&self) -> SinkInfo {
SinkInfo {
name: "Noop Sink".to_string(),
}
}

fn config(&self) -> Config {
Config::default()
}
}

struct DummySink {
num_features: usize,
num_vertices: usize,
}
let source_provider: Box<dyn DataSourceProvider> = Box::new(CityGMLSourceProvider {
filenames: args.filenames,
});
let sink_provider = args.sink.create();

impl Sink for DummySink {
fn receive(&mut self, percel: Percel, _feedback: &mut Feedback) {
self.num_features += 1;
self.num_vertices += percel.cityobj.geometries.vertices.len();
}
let source = source_provider.create(&source_provider.config());
let sink = sink_provider.create(&sink_provider.config());

fn finalize(&mut self, _feedback: &mut Feedback) {
println!("total number of features: {:#?}", self.num_features);
println!("total vertices: {}", self.num_vertices);
}
run(source, sink, &mut canceller);
}

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg()]
filenames: Vec<String>,
}

fn main() {
let args = Args::parse();

let input_driver_factory: Box<dyn SourceProvider> = Box::new(CityGMLSourceProvider {
filenames: args.filenames,
});
let output_driver_factory: Box<dyn SinkProvider> = Box::new(DummySinkProvider {});

let input_driver = input_driver_factory.create(&input_driver_factory.config());
fn run(
source: Box<dyn DataSource>,
sink: Box<dyn DataSink>,
canceller: &mut Arc<Mutex<Canceller>>,
) {
let transformer = Box::new(NoopTransformer {});
let output_driver = output_driver_factory.create(&input_driver_factory.config());

// start the pipeline
let (handle, watcher, _canceller) = pipeline::run(input_driver, transformer, output_driver);
let (handle, watcher, inner_canceller) = nusamai::pipeline::run(source, transformer, sink);
*canceller.lock().unwrap() = inner_canceller;

std::thread::scope(|scope| {
// log watcher
Expand All @@ -192,4 +80,7 @@ fn main() {

// wait for the pipeline to finish
handle.join();
if canceller.lock().unwrap().is_cancelled() {
println!("Pipeline cancelled");
}
}
8 changes: 7 additions & 1 deletion nusamai/src/pipeline/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Feedback {

/// Send a message to the feedback channel
#[inline]
pub fn send(&self, msg: FeedbackMessage) {
pub fn feedback(&self, msg: FeedbackMessage) {
// don't care if the receiver is dropped.
let _ = self.sender.send(msg);
}
Expand All @@ -48,6 +48,7 @@ impl IntoIterator for Watcher {
}
}

#[derive(Clone, Default)]
pub struct Canceller {
cancelled: Arc<AtomicBool>,
}
Expand All @@ -57,6 +58,11 @@ impl Canceller {
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}

/// Checks if the pipeline is cancelled
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
}

pub(crate) fn watcher() -> (Watcher, Feedback, Canceller) {
Expand Down
10 changes: 3 additions & 7 deletions nusamai/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
pub mod feedback;
pub mod runner;
pub mod sink;
pub mod source;
pub mod transform;

pub use feedback::*;
pub use runner::*;
pub use sink::*;
pub use source::*;
pub use transform::*;

use std::sync::mpsc;

pub type Sender = mpsc::SyncSender<Percel>;
pub type Receiver = mpsc::Receiver<Percel>;
pub type Sender = mpsc::SyncSender<Parcel>;
pub type Receiver = mpsc::Receiver<Parcel>;

/// Message passing through pipeline stages
#[derive(Debug)]
pub struct Percel {
pub struct Parcel {
pub cityobj: nusamai_plateau::TopLevelCityObject,
}
Loading

0 comments on commit 5b0e325

Please sign in to comment.