Skip to content

Commit

Permalink
Replaced usage of BrokerConstants with IConfig constants (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel authored Aug 1, 2023
1 parent 2a7b505 commit 0b6b1c0
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 68 deletions.
43 changes: 24 additions & 19 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.*;
import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.broker.security.ACLFileParser;
import io.moquette.broker.security.AcceptAllAuthenticator;
import io.moquette.broker.security.DenyAllAuthorizatorPolicy;
Expand Down Expand Up @@ -198,20 +203,20 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler

if (config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME) != null) {
LOG.warn("Using a deprecated setting {} please update to {}",
BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, BrokerConstants.DATA_PATH_PROPERTY_NAME);
LOG.warn("Forcing {} to true", BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME);
config.setProperty(BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME, Boolean.TRUE.toString());
BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, IConfig.DATA_PATH_PROPERTY_NAME);
LOG.warn("Forcing {} to true", IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME);
config.setProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, Boolean.TRUE.toString());

final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
final String dataPath = persistencePath.substring(0, persistencePath.lastIndexOf("/"));
LOG.warn("Forcing {} to {}", BrokerConstants.DATA_PATH_PROPERTY_NAME, dataPath);
config.setProperty(BrokerConstants.DATA_PATH_PROPERTY_NAME, dataPath);
LOG.warn("Forcing {} to {}", IConfig.DATA_PATH_PROPERTY_NAME, dataPath);
config.setProperty(IConfig.DATA_PATH_PROPERTY_NAME, dataPath);
}

final Clock clock = Clock.systemDefaultZone();

if (Boolean.parseBoolean(config.getProperty(BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME))) {
final Path dataPath = Paths.get(config.getProperty(BrokerConstants.DATA_PATH_PROPERTY_NAME));
if (Boolean.parseBoolean(config.getProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME))) {
final Path dataPath = Paths.get(config.getProperty(IConfig.DATA_PATH_PROPERTY_NAME));
if (!dataPath.toFile().exists()) {
if (dataPath.toFile().mkdirs()) {
LOG.debug("Created data_path {} folder", dataPath);
Expand Down Expand Up @@ -241,13 +246,13 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final Authorizator authorizator = new Authorizator(authorizatorPolicy);

final int globalSessionExpiry;
if (config.getProperty(BrokerConstants.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME) != null) {
globalSessionExpiry = (int) config.durationProp(BrokerConstants.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME).toMillis() / 1000;
if (config.getProperty(IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME) != null) {
globalSessionExpiry = (int) config.durationProp(IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME).toMillis() / 1000;
} else {
globalSessionExpiry = INFINITE_EXPIRY;
}

final int sessionQueueSize = config.intProp(BrokerConstants.SESSION_QUEUE_SIZE, 1024);
final int sessionQueueSize = config.intProp(IConfig.SESSION_QUEUE_SIZE, 1024);
final SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(interceptor, sessionQueueSize);
sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator, scheduler,
clock, globalSessionExpiry, loopsGroup);
Expand All @@ -264,7 +269,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final long startTime = System.currentTimeMillis() - start;
LOG.info("Moquette integration has been started successfully in {} ms", startTime);

if (config.boolProp(BrokerConstants.ENABLE_TELEMETRY_NAME, true)) {
if (config.boolProp(IConfig.ENABLE_TELEMETRY_NAME, true)) {
collectAndSendTelemetryDataAsynch(config);
}

Expand All @@ -273,7 +278,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler

private static IQueueRepository initQueuesRepository(IConfig config, Path dataPath, H2Builder h2Builder) throws IOException {
final IQueueRepository queueRepository;
final String queueType = config.getProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME);
final String queueType = config.getProperty(IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME);
if ("h2".equalsIgnoreCase(queueType)) {
LOG.info("Configuring H2 queue store");
queueRepository = h2Builder.queueRepository();
Expand All @@ -287,7 +292,7 @@ private static IQueueRepository initQueuesRepository(IConfig config, Path dataPa
throw new IOException("Problem in configuring persistent queue on path " + dataPath, e);
}
} else {
final String errMsg = String.format("Invalid property for %s found [%s] while only h2 or segmented are admitted", BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, queueType);
final String errMsg = String.format("Invalid property for %s found [%s] while only h2 or segmented are admitted", IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, queueType);
throw new RuntimeException(errMsg);
}
return queueRepository;
Expand All @@ -314,7 +319,7 @@ private void collectAndSendTelemetryData(IConfig config) {
}

private String checkOrCreateUUID(IConfig config) {
final String storagePath = config.getProperty(BrokerConstants.DATA_PATH_PROPERTY_NAME, "");
final String storagePath = config.getProperty(IConfig.DATA_PATH_PROPERTY_NAME, "");
final Path uuidFilePath = Paths.get(storagePath, ".moquette_uuid");
if (Files.exists(uuidFilePath)) {
try {
Expand Down Expand Up @@ -444,13 +449,13 @@ private void sendTelemetryData(String telemetryDoc) throws IOException {

private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy authorizatorPolicy, IConfig props) {
LOG.debug("Configuring MQTT authorizator policy");
String authorizatorClassName = props.getProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, "");
String authorizatorClassName = props.getProperty(IConfig.AUTHORIZATOR_CLASS_NAME, "");
if (authorizatorPolicy == null && !authorizatorClassName.isEmpty()) {
authorizatorPolicy = loadClass(authorizatorClassName, IAuthorizatorPolicy.class, IConfig.class, props);
}

if (authorizatorPolicy == null) {
String aclFilePath = props.getProperty(BrokerConstants.ACL_FILE_PROPERTY_NAME, "");
String aclFilePath = props.getProperty(IConfig.ACL_FILE_PROPERTY_NAME, "");
if (aclFilePath != null && !aclFilePath.isEmpty()) {
authorizatorPolicy = new DenyAllAuthorizatorPolicy();
try {
Expand All @@ -470,15 +475,15 @@ private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy aut

private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, IConfig props) {
LOG.debug("Configuring MQTT authenticator");
String authenticatorClassName = props.getProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, "");
String authenticatorClassName = props.getProperty(IConfig.AUTHENTICATOR_CLASS_NAME, "");

if (authenticator == null && !authenticatorClassName.isEmpty()) {
authenticator = loadClass(authenticatorClassName, IAuthenticator.class, IConfig.class, props);
}

IResourceLoader resourceLoader = props.getResourceLoader();
if (authenticator == null) {
String passwdPath = props.getProperty(BrokerConstants.PASSWORD_FILE_PROPERTY_NAME, "");
String passwdPath = props.getProperty(IConfig.PASSWORD_FILE_PROPERTY_NAME, "");
if (passwdPath.isEmpty()) {
authenticator = new AcceptAllAuthenticator();
} else {
Expand Down
21 changes: 10 additions & 11 deletions broker/src/main/java/io/moquette/broker/config/IConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,20 @@ public abstract class IConfig {
public abstract String getProperty(String name, String defaultValue);

void assignDefaults() {
setProperty(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(BrokerConstants.PORT));
setProperty(BrokerConstants.HOST_PROPERTY_NAME, BrokerConstants.HOST);
setProperty(PORT_PROPERTY_NAME, Integer.toString(BrokerConstants.PORT));
setProperty(HOST_PROPERTY_NAME, BrokerConstants.HOST);
// setProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME,
// Integer.toString(BrokerConstants.WEBSOCKET_PORT));
setProperty(BrokerConstants.PASSWORD_FILE_PROPERTY_NAME, "");
setProperty(PASSWORD_FILE_PROPERTY_NAME, "");
// setProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME,
// BrokerConstants.DEFAULT_PERSISTENT_PATH);
setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
setProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, "");
setProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, "");
setProperty(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME,
String.valueOf(BrokerConstants.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE));
setProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, "segmented");
setProperty(BrokerConstants.DATA_PATH_PROPERTY_NAME, "data/");
setProperty(BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME, Boolean.TRUE.toString());
setProperty(ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
setProperty(AUTHENTICATOR_CLASS_NAME, "");
setProperty(AUTHORIZATOR_CLASS_NAME, "");
setProperty(NETTY_MAX_BYTES_PROPERTY_NAME, String.valueOf(DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE));
setProperty(PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, "segmented");
setProperty(DATA_PATH_PROPERTY_NAME, "data/");
setProperty(PERSISTENCE_ENABLED_PROPERTY_NAME, Boolean.TRUE.toString());
}

public abstract IResourceLoader getResourceLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.nio.file.Path;
import java.util.Properties;

import static io.moquette.BrokerConstants.ENABLE_TELEMETRY_NAME;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ConfigurationClassLoaderTest implements IAuthenticator, IAuthorizatorPolicy {
Expand Down Expand Up @@ -63,17 +62,17 @@ public void tearDown() {
@Test
public void loadAuthenticator() throws Exception {
Properties props = new Properties(IntegrationUtils.prepareTestProperties(dbPath));
props.setProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, getClass().getName());
props.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
props.setProperty(IConfig.AUTHENTICATOR_CLASS_NAME, getClass().getName());
props.setProperty(IConfig.ENABLE_TELEMETRY_NAME, "false");
startServer(props);
assertTrue(true);
}

@Test
public void loadAuthorizator() throws Exception {
Properties props = new Properties(IntegrationUtils.prepareTestProperties(dbPath));
props.setProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, getClass().getName());
props.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
props.setProperty(IConfig.AUTHORIZATOR_CLASS_NAME, getClass().getName());
props.setProperty(IConfig.ENABLE_TELEMETRY_NAME, "false");
startServer(props);
assertTrue(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import java.nio.file.Path;
import java.util.Properties;

import static io.moquette.BrokerConstants.DATA_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.DATA_PATH_PROPERTY_NAME;
import static io.moquette.BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME;
import static io.moquette.BrokerConstants.ENABLE_TELEMETRY_NAME;
import static io.moquette.BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME;
import static io.moquette.BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
import static io.moquette.BrokerConstants.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.ENABLE_TELEMETRY_NAME;
import static io.moquette.broker.config.IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;

/**
* Used to carry integration configurations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class PublishToManySubscribersUseCaseTest extends AbstractIntegration {
protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
configProps.put(BrokerConstants.SESSION_QUEUE_SIZE, Integer.toString(COMMAND_QUEUE_SIZE));
configProps.put(IConfig.SESSION_QUEUE_SIZE, Integer.toString(COMMAND_QUEUE_SIZE));
IConfig brokerConfig = new MemoryConfig(configProps);
broker.startServer(brokerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void stopServer() {
}

private Properties addDBAuthenticatorConf(Properties properties) {
properties.put(BrokerConstants.AUTHENTICATOR_CLASS_NAME, DBAuthenticator.class.getCanonicalName());
properties.put(IConfig.AUTHENTICATOR_CLASS_NAME, DBAuthenticator.class.getCanonicalName());
properties.put(BrokerConstants.DB_AUTHENTICATOR_DRIVER, DBAuthenticatorTest.ORG_H2_DRIVER);
properties.put(BrokerConstants.DB_AUTHENTICATOR_URL, DBAuthenticatorTest.JDBC_H2_MEM_TEST);
properties.put(BrokerConstants.DB_AUTHENTICATOR_QUERY, "SELECT PASSWORD FROM ACCOUNT WHERE LOGIN=?");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.io.IOException;
import java.util.Properties;

import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslProvider;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -48,17 +48,17 @@ protected void startServer() throws IOException {
m_server = new Server();
Properties sslProps = new Properties();

sslProps.put(BrokerConstants.SSL_PROVIDER, SslProvider.OPENSSL.name());
sslProps.put(IConfig.SSL_PROVIDER, SslProvider.OPENSSL.name());
// sslProps.put(BrokerConstants.NEED_CLIENT_AUTH, "true");

sslProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
sslProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/serverkeystore.jks");
sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.DATA_PATH_PROPERTY_NAME, dbPath);
sslProps.put(BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME, "true");
sslProps.put(IConfig.SSL_PORT_PROPERTY_NAME, "8883");
sslProps.put(IConfig.JKS_PATH_PROPERTY_NAME, "src/test/resources/serverkeystore.jks");
sslProps.put(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(IConfig.DATA_PATH_PROPERTY_NAME, dbPath);
sslProps.put(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, "true");

sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
sslProps.put(IConfig.ENABLE_TELEMETRY_NAME, "false");
m_server.startServer(sslProps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ protected void startServer(String dbPath) throws IOException {
m_server = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
configProps.setProperty(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, "true");
configProps.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
configProps.setProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, "segmented");
configProps.setProperty(IConfig.ENABLE_TELEMETRY_NAME, "false");
configProps.setProperty(IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME, "segmented");
m_config = new MemoryConfig(configProps);
canRead = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
Expand Down Expand Up @@ -177,14 +178,14 @@ protected void startServer(String dbPath) throws IOException {
m_server = new Server();

Properties sslProps = new Properties();
sslProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
sslProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/serverkeystore.jks");
sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.DATA_PATH_PROPERTY_NAME, dbPath);
sslProps.put(BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME, "true");
sslProps.put(IConfig.SSL_PORT_PROPERTY_NAME, "8883");
sslProps.put(IConfig.JKS_PATH_PROPERTY_NAME, "src/test/resources/serverkeystore.jks");
sslProps.put(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(IConfig.DATA_PATH_PROPERTY_NAME, dbPath);
sslProps.put(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, "true");
sslProps.put(BrokerConstants.NEED_CLIENT_AUTH, "true");
sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
sslProps.put(IConfig.ENABLE_TELEMETRY_NAME, "false");
m_server.startServer(sslProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
Expand Down Expand Up @@ -109,13 +109,13 @@ protected void startServer() throws IOException {
m_server = new Server();

Properties sslProps = new Properties();
sslProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
sslProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/serverkeystore.jks");
sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.DATA_PATH_PROPERTY_NAME, dbPath);
sslProps.put(BrokerConstants.PERSISTENCE_ENABLED_PROPERTY_NAME, "true");
sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
sslProps.put(IConfig.SSL_PORT_PROPERTY_NAME, "8883");
sslProps.put(IConfig.JKS_PATH_PROPERTY_NAME, "src/test/resources/serverkeystore.jks");
sslProps.put(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(IConfig.DATA_PATH_PROPERTY_NAME, dbPath);
sslProps.put(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, "true");
sslProps.put(IConfig.ENABLE_TELEMETRY_NAME, "false");
m_server.startServer(sslProps);
}

Expand Down

0 comments on commit 0b6b1c0

Please sign in to comment.