Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix ThreadPool::with_name warning #10

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ log = "0.3"
log4rs = "0.6"
docopt = "0.7"
getopts = "0.2"
chrono = { version = "0.3", features = ["serde"] }
chrono = { version = "0.4.5", features = ["serde"] }
lazy_static = "0.2"
regex = "0.2"
url = "1.4"
rust-crypto = "^0.2"
threadpool = "1.3"
iron = "0.5"
router = "0.5"
bodyparser = "0.6"
persistent = "0.3"
logger = "0.3"
iron = "0.6"
router = "0.6"
bodyparser = "0.8"
persistent = "0.4"
logger = "0.4"
rustc-serialize = "0.3"
serde = "0.9"
serde_derive = "0.9"
serde_json = "0.9"
consul = "0.0.6"
serde = "1.0.71"
serde_derive = "1.0.71"
serde_json = "1.0.26"
consul = {git = "https://github.com/stusmall/consul-rust.git"}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we depend on a specific version?

Copy link
Author

@robbinhan robbinhan Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

  1. chrono need new version and some packages need to update together
  2. consul client is not support new consul server
  3. Keep updating

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, factotum binary is my mistake, I will delete it.

base64 = "0.4"
Binary file added factotum
Binary file not shown.
11 changes: 8 additions & 3 deletions src/factotum_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use iron::typemap::Key;
use logger::Logger;
use persistent::{Read, State};
use threadpool::ThreadPool;
use chrono::prelude::Utc;

use Args;
use factotum_server::command::{CommandStore, Execution};
Expand Down Expand Up @@ -83,6 +84,7 @@ pub fn start(args: Args) -> Result<(), String> {
status: get "/status" => responder::status,
settings: post "/settings" => responder::settings,
submit: post "/submit" => responder::submit,
submit_new: post "/submit/new" => responder::submit,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a new route?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes the same task wants to execute multiple times, and you need to know the results of the run.

check: get "/check" => responder::check
);
let (logger_before, logger_after) = Logger::new(None);
Expand Down Expand Up @@ -113,7 +115,7 @@ pub fn start(args: Args) -> Result<(), String> {

pub fn trigger_worker_manager<T: 'static + Clone + Persistence + Send>(dispatcher: Dispatcher, persistence: T, command_store: &CommandStore) -> Result<(Sender<Dispatch>, JoinHandle<()>, ThreadPool), String> {
let (tx, rx) = mpsc::channel();
let primary_pool = ThreadPool::new_with_name("primary_pool".to_string(), dispatcher.max_workers);
let primary_pool = ThreadPool::with_name("primary_pool".to_string(), dispatcher.max_workers);

let join_handle = spawn_worker_manager(tx.clone(), rx, dispatcher.requests_queue, dispatcher.max_jobs, primary_pool.clone(), persistence, command_store.clone());

Expand Down Expand Up @@ -227,8 +229,11 @@ fn process_job_request<T: 'static + Persistence + Send>(requests_channel: Sender
cmd_args.extend_from_slice(request.factfile_args.as_slice());
match command_store.execute(cmd_path, cmd_args) {
Ok(output) => {
trace!("{}", output);
requests_channel.send(Dispatch::RequestComplete(request)).expect("Job requests channel receiver has been deallocated");
trace!("process_job_request output:{}", output);
let mut clone = request.clone();
clone.exec_output = output;
clone.end_time = Utc::now();
requests_channel.send(Dispatch::RequestComplete(clone)).expect("Job requests channel receiver has been deallocated");
},
Err(e) => {
error!("{}", e);
Expand Down
8 changes: 5 additions & 3 deletions src/factotum_server/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl ConsulPersistence {
}

fn client(&self) -> Client {
Client::new(&format!("{}:{}", self.host.clone(), self.port.clone()))
let address = format!("http://{}:{}", self.host.clone(), self.port.clone());
return Client::new(address)
}
}

Expand All @@ -61,13 +62,14 @@ impl Persistence for ConsulPersistence {

fn set_key(&self, key: &str, value: &str) -> ThreadResult<()> {
panic::catch_unwind(|| {
self.client().keystore.set_key(key.to_owned(), value.to_owned())
let _result = self.client().keystore.set_key(key.to_owned(), value.to_owned());
})
}

fn get_key(&self, key: &str) -> ThreadResult<Option<String>> {
panic::catch_unwind(|| {
self.client().keystore.get_key(key.to_owned())
let result = self.client().keystore.get_key(key.to_owned()).unwrap();
return result;
})
}

Expand Down
25 changes: 22 additions & 3 deletions src/factotum_server/responder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use factotum_server::dispatcher::{Dispatch, Query};
use factotum_server::persistence;
use factotum_server::persistence::{Persistence, JobState};
use factotum_server::server::{ServerManager, SettingsRequest, JobRequest, ValidationError};
use crypto::sha2::Sha256;
use crypto::digest::Digest;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -299,9 +301,26 @@ fn process_valid_submission<T, U, F, G>(url: &Url, request_body: Result<Option<J
}
};

if is_job_queued_or_running(persistence, &mut validated_job_request) {
return (status::BadRequest, create_warn_response(url, "Job is already being processed"))
}
let mut path_have_new = url.path_segments().unwrap();
match path_have_new.nth(1) {
Some(new) => {
if new == "new" {
debug!("new task");
let mut job_digest = Sha256::new();
let rfc_start_time = validated_job_request.start_time.to_rfc3339();
job_digest.input_str(&*validated_job_request.job_id);
job_digest.input_str(&*rfc_start_time);
let new_job_id = job_digest.result_str();

validated_job_request.job_id = new_job_id;
}
},
None => {
if is_job_queued_or_running(persistence, &mut validated_job_request) {
return (status::BadRequest, create_warn_response(url, "Job is already being processed"))
}
}
};

// check queue size
if is_requests_queue_full(jobs_channel.clone()) {
Expand Down
52 changes: 37 additions & 15 deletions src/factotum_server/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::fmt;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use chrono::{DateTime, UTC};
use chrono::DateTime;
use chrono::prelude::Utc;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use getopts::Options;
Expand All @@ -34,7 +35,7 @@ pub struct ServerManager {
pub ip: String,
pub port: u32,
pub state: String,
pub start_time: DateTime<UTC>,
pub start_time: DateTime<Utc>,
pub webhook_uri: String,
pub no_colour: bool,
pub max_stdouterr_size: Option<usize>,
Expand All @@ -46,7 +47,7 @@ impl ServerManager {
ip: if let Some(ip) = wrapped_ip { ip } else { ::IP_DEFAULT.to_string() },
port: if port > 0 && port <= 65535 { port } else { ::PORT_DEFAULT },
state: ::SERVER_STATE_RUN.to_string(),
start_time: UTC::now(),
start_time: Utc::now(),
webhook_uri: webhook_uri.to_string(),
no_colour: no_colour,
max_stdouterr_size: max_stdouterr_size,
Expand All @@ -62,7 +63,7 @@ impl ServerManager {
}

pub fn get_uptime(&self) -> String {
let uptime = UTC::now().signed_duration_since(self.start_time);
let uptime = Utc::now().signed_duration_since(self.start_time);
let seconds = uptime.num_seconds() % 60;
let minutes = uptime.num_minutes() % 60;
let hours = uptime.num_hours() % 24;
Expand All @@ -78,7 +79,19 @@ pub struct JobRequest {
pub job_id: String,
pub job_name: String,
pub factfile_path: String,
pub factfile_args: Vec<String>
pub factfile_args: Vec<String>,
#[serde(default)]
pub exec_output: String,
#[serde(default = "UtcTime::default")]
pub start_time: DateTime<Utc>,
#[serde(default = "UtcTime::default")]
pub end_time: DateTime<Utc>
}
struct UtcTime;
impl UtcTime {
fn default() -> DateTime<Utc> {
Utc::now()
}
}

impl JobRequest {
Expand All @@ -89,6 +102,9 @@ impl JobRequest {
job_name: job_name.to_owned(),
factfile_path: factfile_path.to_owned(),
factfile_args: factfile_args,
exec_output: String::new(),
start_time: Utc::now(),
end_time: Utc::now(),
}
}

Expand Down Expand Up @@ -144,17 +160,23 @@ impl JobRequest {
}

pub fn append_job_args(server: &ServerManager, job: &mut JobRequest) {
let factfile_args_map = get_tag_map(&job.factfile_args);

if server.webhook_uri != "" {
job.factfile_args.push("--webhook".to_string());
job.factfile_args.push(server.webhook_uri.clone());

// Only required with webhook
if let Some(max_bytes) = server.max_stdouterr_size.clone() {
job.factfile_args.push("--max-stdouterr-size".to_string());
job.factfile_args.push(max_bytes.to_string());
};
if !factfile_args_map.contains_key("--webhook") {
job.factfile_args.push("--webhook".to_string());
job.factfile_args.push(server.webhook_uri.clone());
}

if !factfile_args_map.contains_key("--max-stdouterr-size") {
// Only required with webhook
if let Some(max_bytes) = server.max_stdouterr_size.clone() {
job.factfile_args.push("--max-stdouterr-size".to_string());
job.factfile_args.push(max_bytes.to_string());
};
}
}
if server.no_colour {
if !factfile_args_map.contains_key("--no-colour") && server.no_colour {
job.factfile_args.push("--no-colour".to_string());
}
}
Expand Down Expand Up @@ -305,4 +327,4 @@ fn get_tag_map(args: &Vec<String>) -> HashMap<String, String> {
}

arg_map
}
}
6 changes: 3 additions & 3 deletions src/factotum_server/server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn create_new_server_manager() {
assert_eq!(server_manager.ip, "0.0.0.0");
assert_eq!(server_manager.port, 8080);
assert_eq!(server_manager.state, ::SERVER_STATE_RUN);
assert_eq!(server_manager.start_time.date(), UTC::today());
assert_eq!(server_manager.start_time.date(), Utc::today());
assert_eq!(server_manager.webhook_uri, "http://a.webhook.com/");
assert!(server_manager.no_colour);
}
Expand All @@ -43,13 +43,13 @@ fn server_manager_is_not_running() {
#[test]
fn server_manager_get_start_time() {
let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false, Some(10_000));
assert_eq!(server_manager.get_start_time(), UTC::now().format("%F %T %Z").to_string());
assert_eq!(server_manager.get_start_time(), Utc::now().format("%F %T %Z").to_string());
}

#[test]
fn server_manager_get_uptime() {
let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false, Some(10_000));
let uptime = UTC::now().signed_duration_since(server_manager.start_time);
let uptime = Utc::now().signed_duration_since(server_manager.start_time);
let seconds = uptime.num_seconds() % 60;
let minutes = uptime.num_minutes() % 60;
let hours = uptime.num_hours() % 24;
Expand Down