Skip to content

Commit

Permalink
Refactor DipMessagesProcessor - Initial code cleaning (#5)
Browse files Browse the repository at this point in the history
* Reorder functions

* Refactor DipMessagesProcessor - Initial code cleaning
  • Loading branch information
martinboulais authored Oct 8, 2024
1 parent a05596c commit 1c8dea8
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 252 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ AliDip2BK.jar
STATE
AliDip2BK.iml
RunsHistory
out/
68 changes: 38 additions & 30 deletions src/alice/dip/AliDip2BK.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,21 @@ public class AliDip2BK implements Runnable {
public static String KAFKAtopic_EOR = "aliecs.env_leave_state.RUNNING";
public static String KAFKA_group_id = "AliDip";
public static String STORE_HIST_FILE_DIR = "HistFiles";
public static boolean SIMULATE_DIP_EVENTS = false;
private static boolean simulateDipEvents = false;
public static SimpleDateFormat myDateFormat = new SimpleDateFormat("dd-MM-yy HH:mm");
public static SimpleDateFormat logDateFormat = new SimpleDateFormat("dd-MM HH:mm:ss");
public static double DIFF_ENERGY = 5;
public static double DIFF_BETA = 0.001;
public static double DIFF_CURRENT = 5;
public static String ProgPath;
private final long startDate;
public String DipParametersFile = null;
String confFile = "AliDip2BK.properties";
DipClient client;
DipMessagesProcessor process;
DipMessagesProcessor dipMessagesProcessor;
BookkeepingClient bookkeepingClient;
StartOfRunKafkaConsumer kcs;
EndOfRunKafkaConsumer kce;
private long startDate;
private long stopDate;

public AliDip2BK() {
startDate = (new Date()).getTime();
Expand All @@ -66,19 +65,22 @@ public AliDip2BK() {
verifyDirs();

bookkeepingClient = new BookkeepingClient(bookkeepingUrl, bookkeepingToken);
process = new DipMessagesProcessor(bookkeepingClient);
dipMessagesProcessor = new DipMessagesProcessor(bookkeepingClient);
if (AliDip2BK.simulateDipEvents) {
new SimDipEventsFill(dipMessagesProcessor);
}

client = new DipClient(DipParametersFile, process);
client = new DipClient(DipParametersFile, dipMessagesProcessor);

try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}

kcs = new StartOfRunKafkaConsumer(process);
kcs = new StartOfRunKafkaConsumer(dipMessagesProcessor);

kce = new EndOfRunKafkaConsumer(process);
kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);

shutdownProc();

Expand All @@ -95,8 +97,7 @@ static public void log(int level, String module, String mess) {
}

public static void main(String[] args) {
@SuppressWarnings("unused")
AliDip2BK service = new AliDip2BK();
@SuppressWarnings("unused") AliDip2BK service = new AliDip2BK();
}

public void run() {
Expand All @@ -123,25 +124,25 @@ public void shutdownProc() {
public void run() {
log(4, "AliDip2BK", " Main class ENTERS in Shutdown hook");
client.closeSubscriptions();
process.closeInputQueue();
if (process.QueueSize() > 0) {
dipMessagesProcessor.closeInputQueue();
if (dipMessagesProcessor.queueSize() > 0) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}

if (process.QueueSize() == 0) break;
if (dipMessagesProcessor.queueSize() == 0) break;
}
}

if (process.QueueSize() != 0) {
if (dipMessagesProcessor.queueSize() != 0) {
log(4, "AliDip2BK Shutdown", " Data Proc queue is not EMPTY ! Close it anyway ");
} else {
log(2, "AliDip2BK Shutdown", " Data Proc queue is EMPTY and it was correctly closed ");
}
process.saveState();
dipMessagesProcessor.saveState();
writeStat("AliDip2BK.stat", true);
}
});
Expand Down Expand Up @@ -183,7 +184,11 @@ private void loadConf(String filename) {

DipParametersFile = ProgPath + para_file_name;
} else {
log(4, "AliDip2BK.loadConf", " Dip Data Providers Subscription file name is undefined in the conf file ");
log(
4,
"AliDip2BK.loadConf",
" Dip Data Providers Subscription file name is undefined in the conf file "
);
}

String list_param = prop.getProperty("ListDataProvidersPattern");
Expand All @@ -193,7 +198,11 @@ private void loadConf(String filename) {
LIST_PARAM = true;
LIST_PARAM_PAT = list_param;
} else {
log(4, "AliDip2BK.loadConf ", " List DIP Data Providers Pattern is undefined ! The DIP broswer will not start ");
log(
4,
"AliDip2BK.loadConf ",
" List DIP Data Providers Pattern is undefined ! The DIP broswer will not start "
);
}

String debug_n = prop.getProperty("DEBUG_LEVEL");
Expand All @@ -211,8 +220,7 @@ private void loadConf(String filename) {
String keh = prop.getProperty("SAVE_PARAMETERS_HISTORY_PER_RUN");
if (keh != null) {
keh = keh.trim();
SAVE_PARAMETERS_HISTORY_PER_RUN = false;
if (keh.equalsIgnoreCase("Y")) SAVE_PARAMETERS_HISTORY_PER_RUN = true;
SAVE_PARAMETERS_HISTORY_PER_RUN = keh.equalsIgnoreCase("Y");
if (keh.equalsIgnoreCase("YES")) SAVE_PARAMETERS_HISTORY_PER_RUN = true;
if (keh.equalsIgnoreCase("true")) SAVE_PARAMETERS_HISTORY_PER_RUN = true;
}
Expand All @@ -230,9 +238,9 @@ private void loadConf(String filename) {
String sde = prop.getProperty("SIMULATE_DIP_EVENTS");
if (sde != null) {

if (sde.equalsIgnoreCase("Y")) SIMULATE_DIP_EVENTS = true;
if (sde.equalsIgnoreCase("YES")) SIMULATE_DIP_EVENTS = true;
if (sde.equalsIgnoreCase("true")) SIMULATE_DIP_EVENTS = true;
if (sde.equalsIgnoreCase("Y")) simulateDipEvents = true;
if (sde.equalsIgnoreCase("YES")) simulateDipEvents = true;
if (sde.equalsIgnoreCase("true")) simulateDipEvents = true;
}

String kgid = prop.getProperty("KAFKA_group_id");
Expand Down Expand Up @@ -275,7 +283,7 @@ private void loadConf(String filename) {
public void writeStat(String file, boolean final_report) {
String full_file = ProgPath + AliDip2BK.KEEP_STATE_DIR + file;

stopDate = (new Date()).getTime();
var stopDate = (new Date()).getTime();
double dur = (double) (stopDate - startDate) / (1000 * 60 * 60);

Runtime rt = Runtime.getRuntime();
Expand All @@ -288,14 +296,14 @@ public void writeStat(String file, boolean final_report) {
}
mess = mess + " Duration [h]=" + dur + "\n";
mess = mess + " Memory Used [MB]=" + usedMB + "\n";
mess = mess + " No of DIP messages=" + process.statNoDipMess + "\n";
mess = mess + " No of KAFKA messages=" + process.statNoKafMess + "\n";
mess = mess + " No of DIP messages=" + dipMessagesProcessor.statNoDipMess + "\n";
mess = mess + " No of KAFKA messages=" + dipMessagesProcessor.statNoKafMess + "\n";
mess = mess + " No of KAFKA SOR messages=" + kcs.NoMess + "\n";
mess = mess + " No of KAFKA EOR messages=" + kce.NoMess + "\n";
mess = mess + " No of new Fill messgaes =" + process.statNoNewFills + "\n";
mess = mess + " No of new Run messgaes =" + process.statNoNewRuns + "\n";
mess = mess + " No of end Run messages =" + process.statNoEndRuns + "\n";
mess = mess + " No of Duplicated end Run messages =" + process.statNoDuplicateEndRuns + "\n";
mess = mess + " No of new Fill messgaes =" + dipMessagesProcessor.statNoNewFills + "\n";
mess = mess + " No of new Run messgaes =" + dipMessagesProcessor.statNoNewRuns + "\n";
mess = mess + " No of end Run messages =" + dipMessagesProcessor.statNoEndRuns + "\n";
mess = mess + " No of Duplicated end Run messages =" + dipMessagesProcessor.statNoDuplicateEndRuns + "\n";

try {
File of = new File(full_file);
Expand All @@ -322,7 +330,7 @@ public void verifyDirs() {
public void verifyDir(String name) {
if (name != null) {

File directory = new File(String.valueOf(ProgPath + "/" + name));
File directory = new File(ProgPath + "/" + name);

if (!directory.exists()) {
directory.mkdir();
Expand Down
1 change: 1 addition & 0 deletions src/alice/dip/BookkeepingClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* This class is used to write the Dip information into the
* Bookkeeping Data Base
*/

package alice.dip;

import java.net.URI;
Expand Down
2 changes: 1 addition & 1 deletion src/alice/dip/DipClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void handleMessage(DipSubscription subscription, DipData message) {

NoMess = NoMess + 1;

procData.addData(p_name, ans, message);
procData.handleMessage(p_name, ans, message);

}

Expand Down
Loading

0 comments on commit 1c8dea8

Please sign in to comment.