Skip to content

Commit

Permalink
fixed remote error logging and changed services to use serperate ser…
Browse files Browse the repository at this point in the history
…vice discovery entry
  • Loading branch information
brichards64 committed Aug 27, 2024
1 parent 4a49290 commit 0abdb06
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 22 deletions.
2 changes: 1 addition & 1 deletion configfiles/Dummy/ToolChainConfig
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ service_publish_sec 5 # heartbeat send period
service_kick_sec 60 # remove hosts with no heartbeat after given period

###### Backend Services ##### NEWLY ADDED NEEDS TO HAVE PROPER DESCRIPTIONS AND SOME PRUNING BEFORE RELEASE
use_backed_services 1 #
use_backend_services 1 #
db_name daq #
verbosity 1 #
max_retries 3 #
Expand Down
2 changes: 1 addition & 1 deletion configfiles/template/ToolChainConfig
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ service_publish_sec 5 # heartbeat send period
service_kick_sec 60 # remove hosts with no heartbeat after given period

###### Backend Services ##### NEWLY ADDED NEEDS TO HAVE PROPER DESCRIPTIONS AND SOME PRUNING BEFORE RELEASE
use_backed_services 1 #
use_backend_services 1 #
db_name daq #
verbosity 1 #
max_retries 3 #
Expand Down
34 changes: 18 additions & 16 deletions src/DAQLogging/DAQLogging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ using namespace ToolFramework;

DAQLogging::TDAQStreamBuf::~TDAQStreamBuf(){

if(m_remote && (!m_error)){
zmq::message_t Send(5);
snprintf ((char *) Send.data(), 5 , "%s" ,"Quit");
LogSender->send(Send);

pthread_join(thread, NULL);
if(m_remote){
if(!m_error){
zmq::message_t Send(5);
snprintf ((char *) Send.data(), 5 , "%s" ,"Quit");
LogSender->send(Send);

pthread_join(thread, NULL);
}
delete LogSender;
LogSender=0;
delete args;
Expand Down Expand Up @@ -66,11 +68,11 @@ DAQLogging::TDAQStreamBuf::TDAQStreamBuf(zmq::context_t *context, boost::uuids::
if(m_interactive) output=new std::ostream(backup1);


if(m_remote && !m_error){
if(m_remote){

args=new DAQLogging_thread_args(m_context, UUID , log_address, log_port, m_service);

pthread_create (&thread, NULL, DAQLogging::TDAQStreamBuf::RemoteThread, args); // make pushthread with two socets one going out one comming in and buffer socket
if(!m_error) pthread_create (&thread, NULL, DAQLogging::TDAQStreamBuf::RemoteThread, args); // make pushthread with two socets one going out one comming in and buffer socket

LogSender = new zmq::socket_t(*context, ZMQ_PUSH);
LogSender->connect("inproc://LogSender");
Expand Down Expand Up @@ -116,7 +118,7 @@ int DAQLogging::TDAQStreamBuf::sync (){
timeinfo = localtime(&rawtime);
strftime(buffer,80,"%d-%m-%Y %I:%M:%S",timeinfo);
std::string t(buffer);

if(m_local){
if(m_error)(*fileoutput)<<red;
(*fileoutput)<< "{"<<t<<"} [";
Expand All @@ -126,7 +128,7 @@ int DAQLogging::TDAQStreamBuf::sync (){
if(m_error)(*fileoutput)<<plain;
fileoutput->flush();
}

if(m_interactive){
if(m_error) (*output)<<red;
(*output)<<"[";
Expand All @@ -136,27 +138,27 @@ int DAQLogging::TDAQStreamBuf::sync (){
if(m_error) (*output)<<plain;
output->flush();
}

if (m_remote){

zmq::pollitem_t items[] = {
{ *LogSender, 0, ZMQ_POLLOUT, 0 }
};

// printf("starting poll \n");
zmq::poll(&items[0], 1, 1000);
// printf("finnished poll \n");

if (items[0].revents & ZMQ_POLLOUT) {
std::stringstream tmp;
//printf("in sync send \n");
tmp << "{"<<m_service<<":"<<t<<"} ["<<m_messagelevel<<"]: "<< str();
std::string line=tmp.str();

zmq::message_t Send(line.length()+1);
snprintf ((char *) Send.data(), line.length()+1 , "%s" ,line.c_str());
//printf(" sync sending %s\n",line.c_str());
//printf(" sync sending %s\n",line.c_str());
LogSender->send(Send);
//printf("sync sent \n");
//printf("sync sent \n");
}

}
Expand Down
4 changes: 2 additions & 2 deletions src/ServiceDiscovery/Services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ Services::~Services(){

}

bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in){
bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service){

m_context = context_in;
sc_vars = sc_vars_in;
sc_vars->InitThreadedReceiver(m_context, 60000, 100, false);
sc_vars->InitThreadedReceiver(m_context, 60000, 100, new_service);
m_backend_client.SetUp(m_context);

if(!m_variables.Get("service_name",m_name)) m_name="test_service";
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceDiscovery/Services.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace ToolFramework {

Services();
~Services();
bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in);
bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service=false);

bool SQLQuery(const std::string& database, const std::string& query, std::vector<std::string>* responses=nullptr, const unsigned int timeout=300);
bool SQLQuery(const std::string& database, const std::string& query, std::string* response=nullptr, const unsigned int timeout=300);
Expand Down
2 changes: 1 addition & 1 deletion src/ToolDAQChain/ToolDAQChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void ToolDAQChain::Init(unsigned int IO_Threads){

if(m_backend_services){
m_DAQdata->services= new Services();
m_DAQdata->services->Init(m_data->vars, m_DAQdata->context, &m_DAQdata->sc_vars);
m_DAQdata->services->Init(m_data->vars, m_DAQdata->context, &m_DAQdata->sc_vars, true);
}

}
Expand Down

0 comments on commit 0abdb06

Please sign in to comment.