Skip to content

Commit

Permalink
Merge branch 'main' into java8
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Sep 29, 2023
2 parents 60b815b + 154d365 commit aba7829
Show file tree
Hide file tree
Showing 12 changed files with 657 additions and 184 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Introduce a new option "PEER_CERTIFICATE_AS_USERNAME". When enabled, the client certificate is provided as the username to the authenticator.

Version 0.17:
[feature] Introduced DSL FluentConfig to simplify Server's API instantiation #761.
Expand Down
13 changes: 13 additions & 0 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
class BrokerConfiguration {

private final boolean allowAnonymous;
private final boolean peerCertificateAsUsername;
private final boolean allowZeroByteClientId;
private final boolean reauthorizeSubscriptionsOnConnect;
private final int bufferFlushMillis;

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
peerCertificateAsUsername = props.boolProp(IConfig.PEER_CERTIFICATE_AS_USERNAME, false);
allowZeroByteClientId = props.boolProp(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, false);
reauthorizeSubscriptionsOnConnect = props.boolProp(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, false);

Expand Down Expand Up @@ -65,7 +67,14 @@ class BrokerConfiguration {

public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
this(allowAnonymous, false, allowZeroByteClientId,
reauthorizeSubscriptionsOnConnect, bufferFlushMillis);
}

public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
this.allowAnonymous = allowAnonymous;
this.peerCertificateAsUsername = peerCertificateAsUsername;
this.allowZeroByteClientId = allowZeroByteClientId;
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
this.bufferFlushMillis = bufferFlushMillis;
Expand All @@ -75,6 +84,10 @@ public boolean isAllowAnonymous() {
return allowAnonymous;
}

public boolean isPeerCertificateAsUsername() {
return peerCertificateAsUsername;
}

public boolean isAllowZeroByteClientId() {
return allowZeroByteClientId;
}
Expand Down
56 changes: 44 additions & 12 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.PemUtils;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
Expand All @@ -44,16 +45,21 @@
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLPeerUnverifiedException;

import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
Expand Down Expand Up @@ -396,28 +402,54 @@ private void abortConnectionV5(MqttConnectReturnCode returnCode, ConnAckProperti
}

private boolean login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
String userName = null;
byte[] pwd = null;

if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
userName = msg.payload().userName();
// MQTT 3.1.2.9 does not mandate that there is a password - let the authenticator determine if it's needed
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.info("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
return false;
}
final String login = msg.payload().userName();
if (!authenticator.checkValid(clientId, login, pwd)) {
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
}

if (brokerConfig.isPeerCertificateAsUsername()) {
// Use peer cert as username
userName = readClientProvidedCertificates(clientId);
}

if (userName == null || userName.isEmpty()) {
if (brokerConfig.isAllowAnonymous()) {
return true;
} else {
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
return false;
}
NettyUtils.userName(channel, login);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
}

if (!authenticator.checkValid(clientId, userName, pwd)) {
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, userName);
return false;
}
NettyUtils.userName(channel, userName);
return true;
}

private String readClientProvidedCertificates(String clientId) {
try {
SslHandler sslhandler = (SslHandler) channel.pipeline().get("ssl");
if (sslhandler != null) {
Certificate[] certificateChain = sslhandler.engine().getSession().getPeerCertificates();
return PemUtils.certificatesToPem(certificateChain);
}
} catch (SSLPeerUnverifiedException e) {
LOG.debug("No peer cert provided. CId={}", clientId);
} catch (CertificateEncodingException | IOException e) {
LOG.warn("Unable to decode client certificate. CId={}", clientId);
}
return null;
}

void handleConnectionLost() {
final String clientID = NettyUtils.clientID(channel);
if (clientID == null || clientID.isEmpty()) {
Expand Down
11 changes: 9 additions & 2 deletions broker/src/main/java/io/moquette/broker/config/FluentConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static io.moquette.broker.config.IConfig.DATA_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE;
import static io.moquette.broker.config.IConfig.ENABLE_TELEMETRY_NAME;
import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.JKS_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME;
Expand Down Expand Up @@ -84,6 +85,7 @@ private void initializeDefaultValues() {
configAccumulator.put(PORT_PROPERTY_NAME, Integer.toString(BrokerConstants.PORT));
configAccumulator.put(HOST_PROPERTY_NAME, BrokerConstants.HOST);
configAccumulator.put(PASSWORD_FILE_PROPERTY_NAME, "");
configAccumulator.put(PEER_CERTIFICATE_AS_USERNAME, Boolean.FALSE.toString());
configAccumulator.put(ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
configAccumulator.put(AUTHENTICATOR_CLASS_NAME, "");
configAccumulator.put(AUTHORIZATOR_CLASS_NAME, "");
Expand Down Expand Up @@ -141,8 +143,13 @@ public FluentConfig persistentQueueType(PersistentQueueType type) {
return this;
}

public FluentConfig allowAnonymous() {
configAccumulator.put(ALLOW_ANONYMOUS_PROPERTY_NAME, "true");
public FluentConfig enablePeerCertificateAsUsername() {
configAccumulator.put(PEER_CERTIFICATE_AS_USERNAME, "true");
return this;
}

public FluentConfig disablePeerCertificateAsUsername() {
configAccumulator.put(PEER_CERTIFICATE_AS_USERNAME, "false");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class IConfig {
public static final String HOST_PROPERTY_NAME = "host";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
public static final String ALLOW_ANONYMOUS_PROPERTY_NAME = "allow_anonymous";
public static final String PEER_CERTIFICATE_AS_USERNAME = "peer_certificate_as_username";
public static final String AUTHENTICATOR_CLASS_NAME = "authenticator_class";
public static final String AUTHORIZATOR_CLASS_NAME = "authorizator_class";
public static final String PERSISTENT_QUEUE_TYPE_PROPERTY_NAME = "persistent_queue_type"; // h2 or segmented, default h2
Expand Down
101 changes: 101 additions & 0 deletions broker/src/main/java/io/moquette/broker/security/PemUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2023 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.moquette.broker.security;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.Base64;

public final class PemUtils {
private static final String certificateBoundaryType = "CERTIFICATE";

public static String certificatesToPem(Certificate... certificates)
throws CertificateEncodingException, IOException {
try (StringWriter str = new StringWriter();
PemWriter pemWriter = new PemWriter(str)) {
for (Certificate certificate : certificates) {
pemWriter.writeObject(certificateBoundaryType, certificate.getEncoded());
}
pemWriter.close();
return str.toString();
}
}

/**
* Copyright (c) 2000 - 2021 The Legion of the Bouncy Castle Inc. (https://www.bouncycastle.org)
* SPDX-License-Identifier: MIT
*
* <p>A generic PEM writer, based on RFC 1421
* From: https://javadoc.io/static/org.bouncycastle/bcprov-jdk15on/1.62/org/bouncycastle/util/io/pem/PemWriter.html</p>
*/
public static class PemWriter extends BufferedWriter {
private static final int LINE_LENGTH = 64;
private final char[] buf = new char[LINE_LENGTH];

/**
* Base constructor.
*
* @param out output stream to use.
*/
public PemWriter(Writer out) {
super(out);
}

/**
* Writes a pem encoded string.
*
* @param type key type.
* @param bytes encoded string
* @throws IOException IO Exception
*/
public void writeObject(String type, byte[] bytes) throws IOException {
writePreEncapsulationBoundary(type);
writeEncoded(bytes);
writePostEncapsulationBoundary(type);
}

private void writeEncoded(byte[] bytes) throws IOException {
bytes = Base64.getEncoder().encode(bytes);
for (int i = 0; i < bytes.length; i += buf.length) {
int index = 0;
while (index != buf.length) {
if ((i + index) >= bytes.length) {
break;
}
buf[index] = (char) bytes[i + index];
index++;
}
this.write(buf, 0, index);
this.newLine();
}
}

private void writePreEncapsulationBoundary(String type) throws IOException {
this.write("-----BEGIN " + type + "-----");
this.newLine();
}

private void writePostEncapsulationBoundary(String type) throws IOException {
this.write("-----END " + type + "-----");
this.newLine();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import org.junit.jupiter.api.Test;

Expand All @@ -34,6 +35,7 @@ public void defaultConfig() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -46,6 +48,7 @@ public void configureAllowAnonymous() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -58,6 +61,7 @@ public void configureAllowZeroByteClientId() {
assertTrue(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -70,6 +74,7 @@ public void configureReauthorizeSubscriptionsOnConnect() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertTrue(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -82,5 +87,19 @@ public void configureImmediateBufferFlush() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "No immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
public void configurePeerCertificateAsUsername() {
Properties properties = new Properties();
properties.put(IConfig.PEER_CERTIFICATE_AS_USERNAME, "true");
MemoryConfig config = new MemoryConfig(properties);
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
assertTrue(brokerConfiguration.isAllowAnonymous());
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "No immediate flush by default");
assertTrue(brokerConfiguration.isPeerCertificateAsUsername());
}
}
Loading

0 comments on commit aba7829

Please sign in to comment.