diff --git a/distribution/pom.xml b/distribution/pom.xml
index b4834b12..d8fb2dbf 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -47,6 +47,11 @@
flink-connector-flume_${scala.binary.version}
${project.version}
+
+ org.apache.bahir
+ flink-connector-mqtt_${scala.binary.version}
+ ${project.version}
+
org.apache.bahir
flink-connector-netty_${scala.binary.version}
diff --git a/flink-connector-mqtt/README.md b/flink-connector-mqtt/README.md
new file mode 100644
index 00000000..35f8b4a1
--- /dev/null
+++ b/flink-connector-mqtt/README.md
@@ -0,0 +1,15 @@
+# Flink Mqtt Connector
+
+This connector provides a source and sink to [MQTT](https://mqtt.org/)™
+To use this connector, add the following dependency to your project:
+
+
+ org.apache.bahir
+ flink-connector-mqtt_2.11
+ 1.1-SNAPSHOT
+
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html).
+
+The source class is called `MqttSource`, and the sink is `MqttSink`.
diff --git a/flink-connector-mqtt/pom.xml b/flink-connector-mqtt/pom.xml
new file mode 100644
index 00000000..b755066b
--- /dev/null
+++ b/flink-connector-mqtt/pom.xml
@@ -0,0 +1,69 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.bahir
+ bahir-flink-parent_2.11
+ 1.1-SNAPSHOT
+ ..
+
+
+ flink-connector-mqtt_2.11
+ flink-connector-mqtt
+
+ jar
+
+
+ 1.2.2
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ ${mqtt.paho.client.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+ org.apache.flink
+ flink-clients_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+
+
+
diff --git a/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java
new file mode 100644
index 00000000..e69bdc42
--- /dev/null
+++ b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.mqtt;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * mqtt config, include required / optional property keys.
+ */
+public class MqttConfig implements Serializable {
+
+ // ----- Required property keys
+ protected static final String SERVER_URL = "server.url";
+ protected static final String USERNAME = "username";
+ protected static final String PASSWORD = "password";
+
+ // ------ Optional property keys
+ protected static final String CLIENT_ID = "client.id";
+ protected static final String CLEAN_SESSION = "clean.session";
+ protected static final String RETAINED = "retained";
+ protected static final String QOS = "qos";
+
+ protected static void checkProperty(Properties properties, String key) {
+ if (!properties.containsKey(key)) {
+ throw new IllegalArgumentException("Required property '" + key + "' not set.");
+ }
+ }
+
+}
diff --git a/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java
new file mode 100644
index 00000000..88c4e267
--- /dev/null
+++ b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.mqtt;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.flink.streaming.connectors.mqtt.MqttConfig.checkProperty;
+
+/**
+ * Sink for writing messages to an Mqtt queue.
+ *
+ * @param type of input messages
+ */
+public class MqttSink extends RichSinkFunction implements CheckpointedFunction, MqttCallbackExtended {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttSink.class);
+
+ // Mqtt publisher client id
+ private static final String MQTT_CLIENT_ID = MqttClient.generateClientId();
+
+ // Publish topic name
+ private final String topic;
+ // Convert input message to bytes
+ private final SerializationSchema serializationSchema;
+ // User-supplied mqtt properties
+ private final Properties properties;
+
+ private String serverUrl;
+ private String username;
+ private String password;
+ private String clientId;
+ private boolean cleanSession;
+ private boolean retained;
+ private int qos;
+
+ private transient ListState mqttState;
+ private transient MqttClient client;
+
+ /**
+ * Creates {@link MqttSink} for Streaming
+ *
+ * @param topic Publish topic name
+ * @param serializationSchema Convert input message to bytes
+ * @param properties Mqtt properties
+ */
+ public MqttSink(String topic,
+ SerializationSchema serializationSchema,
+ Properties properties) {
+ checkProperty(properties, MqttConfig.SERVER_URL);
+ checkProperty(properties, MqttConfig.USERNAME);
+ checkProperty(properties, MqttConfig.PASSWORD);
+
+ this.topic = topic;
+ this.serializationSchema = serializationSchema;
+ this.properties = properties;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.mqttState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>("mqttClientId", String.class));
+ this.clientId = context.isRestored() ? getOnlyElement(mqttState.get()) : MQTT_CLIENT_ID;
+ LOG.info("Current mqtt publisher clientId is : {}.", clientId);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ mqttState.update(singletonList(MQTT_CLIENT_ID));
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.serverUrl = properties.getProperty(MqttConfig.SERVER_URL);
+ this.username = properties.getProperty(MqttConfig.USERNAME);
+ this.password = properties.getProperty(MqttConfig.PASSWORD);
+ this.clientId = properties.getProperty(MqttConfig.CLIENT_ID, clientId);
+ this.cleanSession = Boolean.parseBoolean(properties.getProperty(
+ MqttConfig.CLEAN_SESSION, "true"));
+ this.retained = Boolean.parseBoolean(properties.getProperty(
+ MqttConfig.RETAINED, "false"));
+ this.qos = Integer.parseInt(properties.getProperty(
+ MqttConfig.QOS, "1"));
+
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setUserName(username);
+ options.setPassword(password.toCharArray());
+ options.setCleanSession(cleanSession);
+ options.setAutomaticReconnect(true);
+
+ client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
+ client.setCallback(this);
+ // Connect to mqtt broker
+ client.connect(options);
+
+ LOG.info("Sink subtask {} initialize by serverUrl:{}, start publishing topic:{} with " +
+ "clientId:{} and qos:{}.", getRuntimeContext().getIndexOfThisSubtask(),
+ serverUrl, topic, clientId, qos);
+ }
+
+ @Override
+ public void invoke(IN value, Context context) throws Exception {
+ try {
+ byte[] message = serializationSchema.serialize(value);
+ client.publish(topic, message, qos, retained);
+ } catch (Exception e) {
+ LOG.error("Failed to publish message to mqtt broker, topic: {}, message: {}", topic, value);
+ throw new RuntimeException("Failed to publish message to mqtt broker", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.disconnect();
+ client.close();
+ }
+
+ @Override
+ public void connectComplete(boolean reconnect, String serUrl) {
+ LOG.info("Call connectComplete method, reconnect:{}, serUrl:{}.", reconnect, serUrl);
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ LOG.error("Connection has losted between publisher org.apache.flink.streaming.connectors.mqtt client and broker.", cause);
+ cause.printStackTrace();
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ throw new IllegalStateException("Method messageArrived is for subscriber, so should not go to here.");
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ }
+
+}
diff --git a/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java
new file mode 100644
index 00000000..feeb2e2e
--- /dev/null
+++ b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.mqtt;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
+import static org.apache.flink.streaming.connectors.mqtt.MqttConfig.checkProperty;
+
+/**
+ * Source for reading messages from an Mqtt queue.
+ *
+ * @param type of output messages
+ */
+public class MqttSource extends RichSourceFunction
+ implements CheckpointedFunction, ResultTypeQueryable, MqttCallbackExtended {
+ private static final Logger LOG = LoggerFactory.getLogger(MqttSource.class);
+
+ // Mqtt subcriber client id
+ private static final String MQTT_CLIENT_ID = MqttClient.generateClientId();
+ // Blocking queue max capacity
+ private static final int MAX_QUEUE_CAPACITY = 10000;
+
+ // Subscribe one or more topics
+ private final String[] topics;
+ // Convert bytes to output message
+ private final DeserializationSchema deserializationSchema;
+ // User-supplied properties
+ private final Properties properties;
+ // Queue cache data form mqtt client
+ private final LinkedBlockingQueue queue;
+
+ private String serverUrl;
+ private String username;
+ private String password;
+ private String clientId;
+ private boolean cleanSession;
+ private int qos;
+
+ private transient ListState mqttState;
+ private transient MqttClient client;
+
+ private volatile boolean isRunning;
+
+ public MqttSource(String topic, DeserializationSchema deserializationSchema, Properties properties) {
+ this(Collections.singletonList(topic), deserializationSchema, properties, MAX_QUEUE_CAPACITY);
+ }
+
+ public MqttSource(List topics, DeserializationSchema deserializationSchema, Properties properties) {
+ this(topics, deserializationSchema, properties, MAX_QUEUE_CAPACITY);
+ }
+
+ /**
+ * Creates {@link MqttSource} for Streaming
+ *
+ * @param topics Subscribe topic list
+ * @param deserializationSchema Convert bytes to output message
+ * @param properties Mqtt properties
+ * @param capacity Blocking queue capacity
+ */
+ public MqttSource(List topics,
+ DeserializationSchema deserializationSchema,
+ Properties properties,
+ int capacity) {
+ checkProperty(properties, MqttConfig.SERVER_URL);
+ checkProperty(properties, MqttConfig.USERNAME);
+ checkProperty(properties, MqttConfig.PASSWORD);
+
+ this.topics = topics.toArray(new String[topics.size()]);
+ this.deserializationSchema = deserializationSchema;
+ this.properties = properties;
+ this.queue = new LinkedBlockingQueue<>(capacity);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ this.mqttState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>("mqttClientId", String.class));
+ this.clientId = context.isRestored() ? getOnlyElement(mqttState.get()) : MQTT_CLIENT_ID;
+ LOG.info("Current mqtt subcriber clientId is: {}", clientId);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ if (connect()) {
+ this.isRunning = true;
+ }
+ }
+
+ private boolean connect() throws Exception {
+ this.serverUrl = properties.getProperty(MqttConfig.SERVER_URL);
+ this.username = properties.getProperty(MqttConfig.USERNAME);
+ this.password = properties.getProperty(MqttConfig.PASSWORD);
+ this.clientId = properties.getProperty(MqttConfig.CLIENT_ID, clientId);
+ this.cleanSession = Boolean.parseBoolean(properties.getProperty(MqttConfig.CLEAN_SESSION, "true"));
+ this.qos = Integer.parseInt(properties.getProperty(MqttConfig.QOS, "1"));
+
+ int[] qoses = new int[topics.length];
+ Arrays.fill(qoses, qos);
+
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setUserName(username);
+ options.setPassword(password.toCharArray());
+ options.setCleanSession(cleanSession);
+ options.setAutomaticReconnect(true);
+
+ client = new MqttClient(serverUrl, clientId, new MemoryPersistence());
+ client.setCallback(this);
+ // Connect to mqtt broker
+ client.connect(options);
+ // Start subscribe topic
+ client.subscribe(topics, qoses);
+
+ LOG.info("Source subtask {} initialize by serverUrl:{}, start subscribing topics:{} with " +
+ "clientId:{} and qoses:{}.", getRuntimeContext().getIndexOfThisSubtask(), serverUrl,
+ Arrays.asList(topics), clientId, Arrays.asList(ArrayUtils.toObject(qoses)));
+
+ return client.isConnected();
+ }
+
+ @Override
+ public void run(SourceContext ctx) throws Exception {
+ LOG.info("Consumer subtask {} start receiving data pushed by mqtt broker.",
+ getRuntimeContext().getIndexOfThisSubtask());
+ while (isRunning) {
+ MqttMessage message = queue.take();
+ try {
+ OUT value = deserializationSchema.deserialize(message.getPayload());
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(value);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to deserialize message, topic: {}, " + "message: {}", Arrays.asList(topics), message);
+ throw new RuntimeException("Failed to deserialize message", e);
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ mqttState.update(singletonList(MQTT_CLIENT_ID));
+ }
+
+ @Override
+ public void cancel() {
+ close();
+ }
+
+ @Override
+ public void close() {
+ try {
+ queue.clear();
+ client.disconnect();
+ client.close();
+ } catch (Exception e) {
+ LOG.error("Close mqtt subscriber client failed", e);
+ } finally {
+ isRunning = false;
+ }
+ }
+
+ @Override
+ public void connectComplete(boolean reconnect, String serUrl) {
+ LOG.info("Call connectCoplete method, reconnect:{}, serverUrl:{}.", reconnect, serUrl);
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ LOG.error("Connection has losted betweenv mqtt subscriber client and broker.", cause);
+ cause.printStackTrace();
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+ LOG.debug("Message {} has arrived from topic {}.", mqttMessage.getPayload(), topic);
+ queue.put(mqttMessage);
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ throw new IllegalStateException("Method deliveryComplete is for publisher, so should not go to here.");
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
diff --git a/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java
new file mode 100644
index 00000000..4ef53933
--- /dev/null
+++ b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.mqtt;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class MqttSinkTest {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+ Properties properties = new Properties();
+ properties.load(new FileInputStream(MqttSinkTest.class.getClassLoader().getResource(
+ "application.properties").getFile()));
+
+ Properties mqttProperties = new Properties();
+
+ // mqtt server url = tcp://.messaging.internetofthings.ibmcloud.com:1883
+ mqttProperties.setProperty(MqttConfig.SERVER_URL,
+ String.format("tcp://%s.messaging.internetofthings.ibmcloud.com:1883", properties.getProperty("Org_ID")));
+
+ // client id = a::
+ mqttProperties.setProperty(MqttConfig.CLIENT_ID,
+ String.format("a:%s:%s", properties.getProperty("Org_ID"), properties.getProperty("App_Id")));
+
+ mqttProperties.setProperty(MqttConfig.USERNAME, properties.getProperty("API_Key"));
+ mqttProperties.setProperty(MqttConfig.PASSWORD, properties.getProperty("APP_Authentication_Token"));
+
+ String topic = String.format("iot/type/%s/id/%s/evt/%s/fmt/json",
+ properties.getProperty("Device_Type"),
+ properties.getProperty("Device_ID"),
+ properties.getProperty("EVENT_ID"));
+
+ // Create source that generate number from 1001 to 2000
+ RandomSource randomSource = new RandomSource();
+
+ // Create mqtt sink public data
+ MqttSink mqttSink = new MqttSink(topic, new SimpleStringSchema(), mqttProperties);
+
+ DataStreamSource source = env.addSource(randomSource);
+ DataStream mapStream = source.map((MapFunction) value ->
+ System.currentTimeMillis() + "_" + value);
+
+ mapStream.addSink(mqttSink);
+ mapStream.print();
+
+ env.execute("RandomSourceToMqttSink");
+ }
+
+ static class RandomSource extends RichSourceFunction {
+ private volatile boolean isRunning;
+ private AtomicLong count;
+
+ RandomSource() {
+ this.isRunning = true;
+ this.count = new AtomicLong(1000);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ public void run(SourceContext collect) throws Exception {
+ while (isRunning) {
+ if (count.incrementAndGet() % 1000 == 0) {
+ count.set(1000);
+ } else {
+ Thread.sleep(3 * 1000);
+ }
+
+ collect.collect(count.get() + "");
+ }
+ }
+
+ public void cancel() {
+ this.isRunning = false;
+ }
+ }
+
+}
diff --git a/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java
new file mode 100644
index 00000000..2462a91b
--- /dev/null
+++ b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.mqtt;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.FileInputStream;
+import java.util.Properties;
+
+public class MqttSourceTest {
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+ Properties properties = new Properties();
+ properties.load(new FileInputStream(MqttSourceTest.class.getClassLoader().getResource(
+ "application.properties").getFile()));
+
+ Properties mqttProperties = new Properties();
+
+ // mqtt server url = tcp://.messaging.internetofthings.ibmcloud.com:1883
+ mqttProperties.setProperty(MqttConfig.SERVER_URL,
+ String.format("tcp://%s.messaging.internetofthings.ibmcloud.com:1883", properties.getProperty("Org_ID")));
+
+ // client id = a::
+ mqttProperties.setProperty(MqttConfig.CLIENT_ID,
+ String.format("a:%s:%s", properties.getProperty("Org_ID"), properties.getProperty("App_Id")));
+
+ mqttProperties.setProperty(MqttConfig.USERNAME, properties.getProperty("API_Key"));
+ mqttProperties.setProperty(MqttConfig.PASSWORD, properties.getProperty("APP_Authentication_Token"));
+
+ String topic = String.format("iot/type/%s/id/%s/evt/%s/fmt/json",
+ properties.getProperty("Device_Type"),
+ properties.getProperty("Device_ID"),
+ properties.getProperty("EVENT_ID"));
+
+ // Create mqtt source subscribe topic
+ MqttSource mqttSource = new MqttSource(topic, new SimpleStringSchema(), mqttProperties);
+ DataStreamSource tempratureDataSource = env.addSource(mqttSource);
+ DataStream stream = tempratureDataSource.map((MapFunction) s ->
+ System.currentTimeMillis() + s);
+ stream.print();
+
+ env.execute("MqttSourceToPrintSink");
+ }
+
+}
diff --git a/flink-connector-mqtt/src/test/resources/application.properties b/flink-connector-mqtt/src/test/resources/application.properties
new file mode 100644
index 00000000..1444d846
--- /dev/null
+++ b/flink-connector-mqtt/src/test/resources/application.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+Org_ID=248f64
+Device_Type=TempSensor
+Device_ID=RoomTempSensor
+App_Id=io_github_pkhanal_DeviceDataAnalysis
+API_Key=a-248f64-werrdnw4pd
+APP_Authentication_Token=I5SBlIuODWvg@**Q0t
+EVENT_ID=temperature
diff --git a/flink-connector-mqtt/src/test/resources/log4j.properties b/flink-connector-mqtt/src/test/resources/log4j.properties
new file mode 100644
index 00000000..28d81db0
--- /dev/null
+++ b/flink-connector-mqtt/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=WARN, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/pom.xml b/pom.xml
index 109b5282..28cd2dcc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
flink-connector-flume
flink-connector-influxdb
flink-connector-kudu
+ flink-connector-mqtt
flink-connector-netty
flink-connector-redis
flink-library-siddhi