Skip to content

Commit

Permalink
Create a DSL to ease the creation of a Server instance (#761)
Browse files Browse the repository at this point in the history
Create a fluent class (FluentConfig) to instantiate, configure and run the server in one liner.
Instead of creating an instance of IConfig and invoking repeatedly put methods with some constants that has to be fetched from a pool.

Implementation of a DSL to ease the configuration of the Server without boring use of Properties and BrokerConstants. This simplify the embedding of the Server in other projects.
  • Loading branch information
andsel authored Jul 31, 2023
1 parent 8e816a3 commit 830e8e2
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 52 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.17-SNAPSHOT:
[feature] Introduced DSL FluentConfig to simplify Server's API instantiation #761.
[fix] resolved issue #633 of bad perfomance when adding many subscriptions to few topics, resolved in #758.
[fix] resolved issue #629 that originated from subscription trees wide and flat, resolved in #630
[dependency] updated Netty to 4.1.93 and tcnative to 2.0.61 (#755)
Expand Down
89 changes: 52 additions & 37 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.moquette;

import io.moquette.broker.config.IConfig;

import java.io.File;

public final class BrokerConstants {
Expand All @@ -29,45 +31,58 @@ public final class BrokerConstants {
* */
@Deprecated
public static final String PERSISTENT_STORE_PROPERTY_NAME = "persistent_store";
public static final String DATA_PATH_PROPERTY_NAME = "data_path";
public static final String PERSISTENT_QUEUE_TYPE_PROPERTY_NAME = "persistent_queue_type"; // h2 or segmented, default h2
public static final String PERSISTENCE_ENABLED_PROPERTY_NAME = "persistence_enabled"; // true or false, default true
@Deprecated
public static final String DATA_PATH_PROPERTY_NAME = IConfig.DATA_PATH_PROPERTY_NAME;
@Deprecated
public static final String PERSISTENT_QUEUE_TYPE_PROPERTY_NAME = IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME;
@Deprecated
public static final String PERSISTENCE_ENABLED_PROPERTY_NAME = IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME;
public static final String SEGMENTED_QUEUE_PAGE_SIZE = "queue_page_size";
public static final int MB = 1024 * 1024;
public static final int DEFAULT_SEGMENTED_QUEUE_PAGE_SIZE = 64 * MB;
public static final String SEGMENTED_QUEUE_SEGMENT_SIZE = "queue_segment_size";
public static final int DEFAULT_SEGMENTED_QUEUE_SEGMENT_SIZE = 4 * MB;
public static final String AUTOSAVE_INTERVAL_PROPERTY_NAME = "autosave_interval";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
public static final String PORT_PROPERTY_NAME = "port";
public static final String HOST_PROPERTY_NAME = "host";
@Deprecated
public static final String PASSWORD_FILE_PROPERTY_NAME = IConfig.PASSWORD_FILE_PROPERTY_NAME;
@Deprecated // use IConfig.PORT_PROPERTY_NAME
public static final String PORT_PROPERTY_NAME = IConfig.PORT_PROPERTY_NAME;
@Deprecated
public static final String HOST_PROPERTY_NAME = IConfig.HOST_PROPERTY_NAME;
public static final String DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME = "moquette_store.h2";
public static final String DEFAULT_PERSISTENT_PATH = System.getProperty("user.dir") + File.separator
+ DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME;
public static final String WEB_SOCKET_PORT_PROPERTY_NAME = "websocket_port";
public static final String WSS_PORT_PROPERTY_NAME = "secure_websocket_port";
public static final String WEB_SOCKET_PATH_PROPERTY_NAME = "websocket_path";
@Deprecated
public static final String WEB_SOCKET_PORT_PROPERTY_NAME = IConfig.WEB_SOCKET_PORT_PROPERTY_NAME;
@Deprecated
public static final String WSS_PORT_PROPERTY_NAME = IConfig.WSS_PORT_PROPERTY_NAME;
@Deprecated
public static final String WEB_SOCKET_PATH_PROPERTY_NAME = IConfig.WEB_SOCKET_PATH_PROPERTY_NAME;
public static final String WEB_SOCKET_MAX_FRAME_SIZE_PROPERTY_NAME = "websocket_max_frame_size";
public static final String SESSION_QUEUE_SIZE = "session_queue_size";

/**
* Defines the SSL implementation to use, default to "JDK".
* @see io.netty.handler.ssl.SslProvider#name()
*/
public static final String SSL_PROVIDER = "ssl_provider";
public static final String SSL_PORT_PROPERTY_NAME = "ssl_port";
public static final String JKS_PATH_PROPERTY_NAME = "jks_path";

/** @see java.security.KeyStore#getInstance(String) for allowed types, default to "jks" */
public static final String KEY_STORE_TYPE = "key_store_type";
public static final String KEY_STORE_PASSWORD_PROPERTY_NAME = "key_store_password";
public static final String KEY_MANAGER_PASSWORD_PROPERTY_NAME = "key_manager_password";
public static final String ALLOW_ANONYMOUS_PROPERTY_NAME = "allow_anonymous";
@Deprecated
public static final String SESSION_QUEUE_SIZE = IConfig.SESSION_QUEUE_SIZE;
@Deprecated
public static final String SSL_PROVIDER = IConfig.SSL_PROVIDER;
@Deprecated
public static final String SSL_PORT_PROPERTY_NAME = IConfig.SSL_PORT_PROPERTY_NAME;
@Deprecated
public static final String JKS_PATH_PROPERTY_NAME = IConfig.JKS_PATH_PROPERTY_NAME;
@Deprecated
public static final String KEY_STORE_TYPE = IConfig.KEY_STORE_TYPE;
@Deprecated
public static final String KEY_STORE_PASSWORD_PROPERTY_NAME = IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME;
@Deprecated
public static final String KEY_MANAGER_PASSWORD_PROPERTY_NAME = IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME;
@Deprecated
public static final String ALLOW_ANONYMOUS_PROPERTY_NAME = IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME;
public static final String REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT = "reauthorize_subscriptions_on_connect";
public static final String ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME = "allow_zero_byte_client_id";
public static final String ACL_FILE_PROPERTY_NAME = "acl_file";
public static final String AUTHORIZATOR_CLASS_NAME = "authorizator_class";
public static final String AUTHENTICATOR_CLASS_NAME = "authenticator_class";
@Deprecated
public static final String ACL_FILE_PROPERTY_NAME = IConfig.ACL_FILE_PROPERTY_NAME;
@Deprecated
public static final String AUTHORIZATOR_CLASS_NAME = IConfig.AUTHORIZATOR_CLASS_NAME;
@Deprecated
public static final String AUTHENTICATOR_CLASS_NAME = IConfig.AUTHENTICATOR_CLASS_NAME;
public static final String DB_AUTHENTICATOR_DRIVER = "authenticator.db.driver";
public static final String DB_AUTHENTICATOR_URL = "authenticator.db.url";
public static final String DB_AUTHENTICATOR_QUERY = "authenticator.db.query";
Expand All @@ -84,19 +99,17 @@ public final class BrokerConstants {
public static final String NETTY_SO_KEEPALIVE_PROPERTY_NAME = "netty.so_keepalive";
public static final String NETTY_CHANNEL_TIMEOUT_SECONDS_PROPERTY_NAME = "netty.channel_timeout.seconds";
public static final String NETTY_EPOLL_PROPERTY_NAME = "netty.epoll";
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
@Deprecated
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = IConfig.NETTY_MAX_BYTES_PROPERTY_NAME;
@Deprecated
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE;
/**
* @deprecated use the BUFFER_FLUSH_MS_PROPERTY_NAME
* */
@Deprecated
public static final String IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME = "immediate_buffer_flush";
/**
* 0/immediate means immediate flush, like immediate_buffer_flush = true
* -1/full means no explicit flush, let Netty flush when write buffers are full, like immediate_buffer_flush = false
* a number of milliseconds to between flushes
* */
public static final String BUFFER_FLUSH_MS_PROPERTY_NAME = "buffer_flush_millis";
@Deprecated
public static final String BUFFER_FLUSH_MS_PROPERTY_NAME = IConfig.BUFFER_FLUSH_MS_PROPERTY_NAME;
public static final int NO_BUFFER_FLUSH = -1;
public static final int IMMEDIATE_BUFFER_FLUSH = 0;

Expand All @@ -105,14 +118,16 @@ public final class BrokerConstants {
public static final String METRICS_LIBRATO_TOKEN_PROPERTY_NAME = "metrics.librato.token";
public static final String METRICS_LIBRATO_SOURCE_PROPERTY_NAME = "metrics.librato.source";

public static final String ENABLE_TELEMETRY_NAME = "telemetry_enabled";
@Deprecated
public static final String ENABLE_TELEMETRY_NAME = IConfig.ENABLE_TELEMETRY_NAME;

public static final String BUGSNAG_ENABLE_PROPERTY_NAME = "use_bugsnag";
public static final String BUGSNAG_TOKEN_PROPERTY_NAME = "bugsnag.token";

public static final String STORAGE_CLASS_NAME = "storage_class";

public static final String PERSISTENT_CLEAN_EXPIRATION_PROPERTY_NAME = "persistent_client_expiration";
@Deprecated
public static final String PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME = IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME;

public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;
Expand Down
14 changes: 7 additions & 7 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.broker.config.*;
import io.moquette.broker.security.ACLFileParser;
import io.moquette.broker.security.AcceptAllAuthenticator;
import io.moquette.broker.security.DenyAllAuthorizatorPolicy;
Expand Down Expand Up @@ -245,8 +241,8 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final Authorizator authorizator = new Authorizator(authorizatorPolicy);

final int globalSessionExpiry;
if (config.getProperty(BrokerConstants.PERSISTENT_CLEAN_EXPIRATION_PROPERTY_NAME) != null) {
globalSessionExpiry = (int) config.durationProp(BrokerConstants.PERSISTENT_CLEAN_EXPIRATION_PROPERTY_NAME).toMillis() / 1000;
if (config.getProperty(BrokerConstants.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME) != null) {
globalSessionExpiry = (int) config.durationProp(BrokerConstants.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME).toMillis() / 1000;
} else {
globalSessionExpiry = INFINITE_EXPIRY;
}
Expand Down Expand Up @@ -671,4 +667,8 @@ public boolean disconnectClient(final String clientId) {
public boolean disconnectAndPurgeClientState(final String clientId) {
return sessions.dropSession(clientId, true);
}

public FluentConfig withConfig() {
return new FluentConfig(this);
}
}
Loading

0 comments on commit 830e8e2

Please sign in to comment.