Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peer cert as username #775

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Introduce a new option "PEER_CERTIFICATE_AS_USERNAME". When enabled, the client certificate is provided as the username to the authenticator.

Version 0.17:
[feature] Introduced DSL FluentConfig to simplify Server's API instantiation #761.
Expand Down
13 changes: 13 additions & 0 deletions broker/src/main/java/io/moquette/broker/BrokerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
class BrokerConfiguration {

private final boolean allowAnonymous;
private final boolean peerCertificateAsUsername;
private final boolean allowZeroByteClientId;
private final boolean reauthorizeSubscriptionsOnConnect;
private final int bufferFlushMillis;

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
peerCertificateAsUsername = props.boolProp(IConfig.PEER_CERTIFICATE_AS_USERNAME, false);
allowZeroByteClientId = props.boolProp(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, false);
reauthorizeSubscriptionsOnConnect = props.boolProp(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, false);

Expand Down Expand Up @@ -65,7 +67,14 @@ class BrokerConfiguration {

public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
this(allowAnonymous, false, allowZeroByteClientId,
reauthorizeSubscriptionsOnConnect, bufferFlushMillis);
}

public BrokerConfiguration(boolean allowAnonymous, boolean peerCertificateAsUsername, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
this.allowAnonymous = allowAnonymous;
this.peerCertificateAsUsername = peerCertificateAsUsername;
this.allowZeroByteClientId = allowZeroByteClientId;
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
this.bufferFlushMillis = bufferFlushMillis;
Expand All @@ -75,6 +84,10 @@ public boolean isAllowAnonymous() {
return allowAnonymous;
}

public boolean isPeerCertificateAsUsername() {
return peerCertificateAsUsername;
}

public boolean isAllowZeroByteClientId() {
return allowZeroByteClientId;
}
Expand Down
56 changes: 44 additions & 12 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.PemUtils;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
Expand All @@ -44,16 +45,21 @@
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLPeerUnverifiedException;

import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
Expand Down Expand Up @@ -396,28 +402,54 @@ private void abortConnectionV5(MqttConnectReturnCode returnCode, ConnAckProperti
}

private boolean login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
String userName = null;
byte[] pwd = null;

if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
userName = msg.payload().userName();
// MQTT 3.1.2.9 does not mandate that there is a password - let the authenticator determine if it's needed
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.info("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
return false;
}
final String login = msg.payload().userName();
if (!authenticator.checkValid(clientId, login, pwd)) {
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
}

if (brokerConfig.isPeerCertificateAsUsername()) {
// Use peer cert as username
userName = readClientProvidedCertificates(clientId);
}

if (userName == null || userName.isEmpty()) {
if (brokerConfig.isAllowAnonymous()) {
return true;
} else {
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
return false;
}
NettyUtils.userName(channel, login);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
}

if (!authenticator.checkValid(clientId, userName, pwd)) {
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, userName);
return false;
}
NettyUtils.userName(channel, userName);
return true;
}

private String readClientProvidedCertificates(String clientId) {
try {
SslHandler sslhandler = (SslHandler) channel.pipeline().get("ssl");
if (sslhandler != null) {
Certificate[] certificateChain = sslhandler.engine().getSession().getPeerCertificates();
return PemUtils.certificatesToPem(certificateChain);
}
} catch (SSLPeerUnverifiedException e) {
LOG.debug("No peer cert provided. CId={}", clientId);
} catch (CertificateEncodingException | IOException e) {
LOG.warn("Unable to decode client certificate. CId={}", clientId);
}
return null;
}

void handleConnectionLost() {
final String clientID = NettyUtils.clientID(channel);
if (clientID == null || clientID.isEmpty()) {
Expand Down
11 changes: 9 additions & 2 deletions broker/src/main/java/io/moquette/broker/config/FluentConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static io.moquette.broker.config.IConfig.DATA_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE;
import static io.moquette.broker.config.IConfig.ENABLE_TELEMETRY_NAME;
import static io.moquette.broker.config.IConfig.PEER_CERTIFICATE_AS_USERNAME;
import static io.moquette.broker.config.IConfig.PORT_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.JKS_PATH_PROPERTY_NAME;
import static io.moquette.broker.config.IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME;
Expand Down Expand Up @@ -84,6 +85,7 @@ private void initializeDefaultValues() {
configAccumulator.put(PORT_PROPERTY_NAME, Integer.toString(BrokerConstants.PORT));
configAccumulator.put(HOST_PROPERTY_NAME, BrokerConstants.HOST);
configAccumulator.put(PASSWORD_FILE_PROPERTY_NAME, "");
configAccumulator.put(PEER_CERTIFICATE_AS_USERNAME, Boolean.FALSE.toString());
configAccumulator.put(ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
configAccumulator.put(AUTHENTICATOR_CLASS_NAME, "");
configAccumulator.put(AUTHORIZATOR_CLASS_NAME, "");
Expand Down Expand Up @@ -141,8 +143,13 @@ public FluentConfig persistentQueueType(PersistentQueueType type) {
return this;
}

public FluentConfig allowAnonymous() {
configAccumulator.put(ALLOW_ANONYMOUS_PROPERTY_NAME, "true");
public FluentConfig enablePeerCertificateAsUsername() {
configAccumulator.put(PEER_CERTIFICATE_AS_USERNAME, "true");
return this;
}

public FluentConfig disablePeerCertificateAsUsername() {
configAccumulator.put(PEER_CERTIFICATE_AS_USERNAME, "false");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class IConfig {
public static final String HOST_PROPERTY_NAME = "host";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
public static final String ALLOW_ANONYMOUS_PROPERTY_NAME = "allow_anonymous";
public static final String PEER_CERTIFICATE_AS_USERNAME = "peer_certificate_as_username";
public static final String AUTHENTICATOR_CLASS_NAME = "authenticator_class";
public static final String AUTHORIZATOR_CLASS_NAME = "authorizator_class";
public static final String PERSISTENT_QUEUE_TYPE_PROPERTY_NAME = "persistent_queue_type"; // h2 or segmented, default h2
Expand Down
101 changes: 101 additions & 0 deletions broker/src/main/java/io/moquette/broker/security/PemUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2023 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.moquette.broker.security;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.Base64;

public final class PemUtils {
private static final String certificateBoundaryType = "CERTIFICATE";

public static String certificatesToPem(Certificate... certificates)
throws CertificateEncodingException, IOException {
try (StringWriter str = new StringWriter();
PemWriter pemWriter = new PemWriter(str)) {
for (Certificate certificate : certificates) {
pemWriter.writeObject(certificateBoundaryType, certificate.getEncoded());
}
pemWriter.close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But pemWriter isn't closed automatically by the try statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is closed automatically yes, however it needs to be closed before calling str.toString() or else the str will not contain the correct contents due to the buffered nature of BufferedWriter which PemWriter extends.

return str.toString();
}
}

/**
* Copyright (c) 2000 - 2021 The Legion of the Bouncy Castle Inc. (https://www.bouncycastle.org)
* SPDX-License-Identifier: MIT
*
* <p>A generic PEM writer, based on RFC 1421
* From: https://javadoc.io/static/org.bouncycastle/bcprov-jdk15on/1.62/org/bouncycastle/util/io/pem/PemWriter.html</p>
*/
public static class PemWriter extends BufferedWriter {
private static final int LINE_LENGTH = 64;
private final char[] buf = new char[LINE_LENGTH];

/**
MikeDombo marked this conversation as resolved.
Show resolved Hide resolved
* Base constructor.
*
* @param out output stream to use.
*/
public PemWriter(Writer out) {
super(out);
}

/**
MikeDombo marked this conversation as resolved.
Show resolved Hide resolved
* Writes a pem encoded string.
*
* @param type key type.
* @param bytes encoded string
* @throws IOException IO Exception
*/
public void writeObject(String type, byte[] bytes) throws IOException {
writePreEncapsulationBoundary(type);
writeEncoded(bytes);
writePostEncapsulationBoundary(type);
}
MikeDombo marked this conversation as resolved.
Show resolved Hide resolved

private void writeEncoded(byte[] bytes) throws IOException {
bytes = Base64.getEncoder().encode(bytes);
for (int i = 0; i < bytes.length; i += buf.length) {
int index = 0;
while (index != buf.length) {
if ((i + index) >= bytes.length) {
break;
}
buf[index] = (char) bytes[i + index];
index++;
}
this.write(buf, 0, index);
this.newLine();
}
}
MikeDombo marked this conversation as resolved.
Show resolved Hide resolved

private void writePreEncapsulationBoundary(String type) throws IOException {
this.write("-----BEGIN " + type + "-----");
this.newLine();
}
MikeDombo marked this conversation as resolved.
Show resolved Hide resolved

private void writePostEncapsulationBoundary(String type) throws IOException {
this.write("-----END " + type + "-----");
this.newLine();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import org.junit.jupiter.api.Test;

Expand All @@ -34,6 +35,7 @@ public void defaultConfig() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -46,6 +48,7 @@ public void configureAllowAnonymous() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -58,6 +61,7 @@ public void configureAllowZeroByteClientId() {
assertTrue(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -70,6 +74,7 @@ public void configureReauthorizeSubscriptionsOnConnect() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertTrue(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "Immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
Expand All @@ -82,5 +87,19 @@ public void configureImmediateBufferFlush() {
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "No immediate flush by default");
assertFalse(brokerConfiguration.isPeerCertificateAsUsername());
}

@Test
public void configurePeerCertificateAsUsername() {
Properties properties = new Properties();
properties.put(IConfig.PEER_CERTIFICATE_AS_USERNAME, "true");
MemoryConfig config = new MemoryConfig(properties);
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
assertTrue(brokerConfiguration.isAllowAnonymous());
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
assertEquals(IMMEDIATE_BUFFER_FLUSH, brokerConfiguration.getBufferFlushMillis(), "No immediate flush by default");
assertTrue(brokerConfiguration.isPeerCertificateAsUsername());
}
}
Loading
Loading