From 2f583a0608ee7987b88d78af0422414f4e2b1de5 Mon Sep 17 00:00:00 2001 From: Ruguo Date: Sun, 22 Nov 2020 22:45:53 +0800 Subject: [PATCH 1/2] [BAHIR-212] Add MQTT Connector for Flink --- distribution/pom.xml | 5 + flink-connector-mqtt/README.md | 15 ++ flink-connector-mqtt/pom.xml | 69 ++++++ .../streaming/connectors/mqtt/MqttConfig.java | 45 ++++ .../streaming/connectors/mqtt/MqttSink.java | 171 +++++++++++++ .../streaming/connectors/mqtt/MqttSource.java | 229 ++++++++++++++++++ .../connectors/mqtt/MqttSinkTest.java | 106 ++++++++ .../connectors/mqtt/MqttSourceTest.java | 66 +++++ .../src/test/resources/application.properties | 7 + .../src/test/resources/log4j.properties | 24 ++ pom.xml | 1 + 11 files changed, 738 insertions(+) create mode 100644 flink-connector-mqtt/README.md create mode 100644 flink-connector-mqtt/pom.xml create mode 100644 flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java create mode 100644 flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java create mode 100644 flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java create mode 100644 flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java create mode 100644 flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java create mode 100644 flink-connector-mqtt/src/test/resources/application.properties create mode 100644 flink-connector-mqtt/src/test/resources/log4j.properties diff --git a/distribution/pom.xml b/distribution/pom.xml index b4834b12..d8fb2dbf 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -47,6 +47,11 @@ flink-connector-flume_${scala.binary.version} ${project.version} + + org.apache.bahir + flink-connector-mqtt_${scala.binary.version} + ${project.version} + org.apache.bahir flink-connector-netty_${scala.binary.version} diff --git a/flink-connector-mqtt/README.md b/flink-connector-mqtt/README.md new file mode 100644 index 00000000..35f8b4a1 --- /dev/null +++ b/flink-connector-mqtt/README.md @@ -0,0 +1,15 @@ +# Flink Mqtt Connector + +This connector provides a source and sink to [MQTT](https://mqtt.org/)™ +To use this connector, add the following dependency to your project: + + + org.apache.bahir + flink-connector-mqtt_2.11 + 1.1-SNAPSHOT + + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html). + +The source class is called `MqttSource`, and the sink is `MqttSink`. diff --git a/flink-connector-mqtt/pom.xml b/flink-connector-mqtt/pom.xml new file mode 100644 index 00000000..b755066b --- /dev/null +++ b/flink-connector-mqtt/pom.xml @@ -0,0 +1,69 @@ + + + + + 4.0.0 + + + org.apache.bahir + bahir-flink-parent_2.11 + 1.1-SNAPSHOT + .. + + + flink-connector-mqtt_2.11 + flink-connector-mqtt + + jar + + + 1.2.2 + + + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${mqtt.paho.client.version} + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + test-jar + test + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + test + + + + + diff --git a/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java new file mode 100644 index 00000000..e69bdc42 --- /dev/null +++ b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.mqtt; + +import java.io.Serializable; +import java.util.Properties; + +/** + * mqtt config, include required / optional property keys. + */ +public class MqttConfig implements Serializable { + + // ----- Required property keys + protected static final String SERVER_URL = "server.url"; + protected static final String USERNAME = "username"; + protected static final String PASSWORD = "password"; + + // ------ Optional property keys + protected static final String CLIENT_ID = "client.id"; + protected static final String CLEAN_SESSION = "clean.session"; + protected static final String RETAINED = "retained"; + protected static final String QOS = "qos"; + + protected static void checkProperty(Properties properties, String key) { + if (!properties.containsKey(key)) { + throw new IllegalArgumentException("Required property '" + key + "' not set."); + } + } + +} diff --git a/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java new file mode 100644 index 00000000..88c4e267 --- /dev/null +++ b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSink.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.mqtt; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +import static java.util.Collections.singletonList; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.flink.streaming.connectors.mqtt.MqttConfig.checkProperty; + +/** + * Sink for writing messages to an Mqtt queue. + * + * @param type of input messages + */ +public class MqttSink extends RichSinkFunction implements CheckpointedFunction, MqttCallbackExtended { + private static final Logger LOG = LoggerFactory.getLogger(MqttSink.class); + + // Mqtt publisher client id + private static final String MQTT_CLIENT_ID = MqttClient.generateClientId(); + + // Publish topic name + private final String topic; + // Convert input message to bytes + private final SerializationSchema serializationSchema; + // User-supplied mqtt properties + private final Properties properties; + + private String serverUrl; + private String username; + private String password; + private String clientId; + private boolean cleanSession; + private boolean retained; + private int qos; + + private transient ListState mqttState; + private transient MqttClient client; + + /** + * Creates {@link MqttSink} for Streaming + * + * @param topic Publish topic name + * @param serializationSchema Convert input message to bytes + * @param properties Mqtt properties + */ + public MqttSink(String topic, + SerializationSchema serializationSchema, + Properties properties) { + checkProperty(properties, MqttConfig.SERVER_URL); + checkProperty(properties, MqttConfig.USERNAME); + checkProperty(properties, MqttConfig.PASSWORD); + + this.topic = topic; + this.serializationSchema = serializationSchema; + this.properties = properties; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.mqttState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("mqttClientId", String.class)); + this.clientId = context.isRestored() ? getOnlyElement(mqttState.get()) : MQTT_CLIENT_ID; + LOG.info("Current mqtt publisher clientId is : {}.", clientId); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + mqttState.update(singletonList(MQTT_CLIENT_ID)); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.serverUrl = properties.getProperty(MqttConfig.SERVER_URL); + this.username = properties.getProperty(MqttConfig.USERNAME); + this.password = properties.getProperty(MqttConfig.PASSWORD); + this.clientId = properties.getProperty(MqttConfig.CLIENT_ID, clientId); + this.cleanSession = Boolean.parseBoolean(properties.getProperty( + MqttConfig.CLEAN_SESSION, "true")); + this.retained = Boolean.parseBoolean(properties.getProperty( + MqttConfig.RETAINED, "false")); + this.qos = Integer.parseInt(properties.getProperty( + MqttConfig.QOS, "1")); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setCleanSession(cleanSession); + options.setAutomaticReconnect(true); + + client = new MqttClient(serverUrl, clientId, new MemoryPersistence()); + client.setCallback(this); + // Connect to mqtt broker + client.connect(options); + + LOG.info("Sink subtask {} initialize by serverUrl:{}, start publishing topic:{} with " + + "clientId:{} and qos:{}.", getRuntimeContext().getIndexOfThisSubtask(), + serverUrl, topic, clientId, qos); + } + + @Override + public void invoke(IN value, Context context) throws Exception { + try { + byte[] message = serializationSchema.serialize(value); + client.publish(topic, message, qos, retained); + } catch (Exception e) { + LOG.error("Failed to publish message to mqtt broker, topic: {}, message: {}", topic, value); + throw new RuntimeException("Failed to publish message to mqtt broker", e); + } + } + + @Override + public void close() throws Exception { + client.disconnect(); + client.close(); + } + + @Override + public void connectComplete(boolean reconnect, String serUrl) { + LOG.info("Call connectComplete method, reconnect:{}, serUrl:{}.", reconnect, serUrl); + } + + @Override + public void connectionLost(Throwable cause) { + LOG.error("Connection has losted between publisher org.apache.flink.streaming.connectors.mqtt client and broker.", cause); + cause.printStackTrace(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + throw new IllegalStateException("Method messageArrived is for subscriber, so should not go to here."); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + +} diff --git a/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java new file mode 100644 index 00000000..feeb2e2e --- /dev/null +++ b/flink-connector-mqtt/src/main/java/org/apache/flink/streaming/connectors/mqtt/MqttSource.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.mqtt; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; + +import static java.util.Collections.singletonList; +import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.flink.streaming.connectors.mqtt.MqttConfig.checkProperty; + +/** + * Source for reading messages from an Mqtt queue. + * + * @param type of output messages + */ +public class MqttSource extends RichSourceFunction + implements CheckpointedFunction, ResultTypeQueryable, MqttCallbackExtended { + private static final Logger LOG = LoggerFactory.getLogger(MqttSource.class); + + // Mqtt subcriber client id + private static final String MQTT_CLIENT_ID = MqttClient.generateClientId(); + // Blocking queue max capacity + private static final int MAX_QUEUE_CAPACITY = 10000; + + // Subscribe one or more topics + private final String[] topics; + // Convert bytes to output message + private final DeserializationSchema deserializationSchema; + // User-supplied properties + private final Properties properties; + // Queue cache data form mqtt client + private final LinkedBlockingQueue queue; + + private String serverUrl; + private String username; + private String password; + private String clientId; + private boolean cleanSession; + private int qos; + + private transient ListState mqttState; + private transient MqttClient client; + + private volatile boolean isRunning; + + public MqttSource(String topic, DeserializationSchema deserializationSchema, Properties properties) { + this(Collections.singletonList(topic), deserializationSchema, properties, MAX_QUEUE_CAPACITY); + } + + public MqttSource(List topics, DeserializationSchema deserializationSchema, Properties properties) { + this(topics, deserializationSchema, properties, MAX_QUEUE_CAPACITY); + } + + /** + * Creates {@link MqttSource} for Streaming + * + * @param topics Subscribe topic list + * @param deserializationSchema Convert bytes to output message + * @param properties Mqtt properties + * @param capacity Blocking queue capacity + */ + public MqttSource(List topics, + DeserializationSchema deserializationSchema, + Properties properties, + int capacity) { + checkProperty(properties, MqttConfig.SERVER_URL); + checkProperty(properties, MqttConfig.USERNAME); + checkProperty(properties, MqttConfig.PASSWORD); + + this.topics = topics.toArray(new String[topics.size()]); + this.deserializationSchema = deserializationSchema; + this.properties = properties; + this.queue = new LinkedBlockingQueue<>(capacity); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.mqttState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>("mqttClientId", String.class)); + this.clientId = context.isRestored() ? getOnlyElement(mqttState.get()) : MQTT_CLIENT_ID; + LOG.info("Current mqtt subcriber clientId is: {}", clientId); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + if (connect()) { + this.isRunning = true; + } + } + + private boolean connect() throws Exception { + this.serverUrl = properties.getProperty(MqttConfig.SERVER_URL); + this.username = properties.getProperty(MqttConfig.USERNAME); + this.password = properties.getProperty(MqttConfig.PASSWORD); + this.clientId = properties.getProperty(MqttConfig.CLIENT_ID, clientId); + this.cleanSession = Boolean.parseBoolean(properties.getProperty(MqttConfig.CLEAN_SESSION, "true")); + this.qos = Integer.parseInt(properties.getProperty(MqttConfig.QOS, "1")); + + int[] qoses = new int[topics.length]; + Arrays.fill(qoses, qos); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setCleanSession(cleanSession); + options.setAutomaticReconnect(true); + + client = new MqttClient(serverUrl, clientId, new MemoryPersistence()); + client.setCallback(this); + // Connect to mqtt broker + client.connect(options); + // Start subscribe topic + client.subscribe(topics, qoses); + + LOG.info("Source subtask {} initialize by serverUrl:{}, start subscribing topics:{} with " + + "clientId:{} and qoses:{}.", getRuntimeContext().getIndexOfThisSubtask(), serverUrl, + Arrays.asList(topics), clientId, Arrays.asList(ArrayUtils.toObject(qoses))); + + return client.isConnected(); + } + + @Override + public void run(SourceContext ctx) throws Exception { + LOG.info("Consumer subtask {} start receiving data pushed by mqtt broker.", + getRuntimeContext().getIndexOfThisSubtask()); + while (isRunning) { + MqttMessage message = queue.take(); + try { + OUT value = deserializationSchema.deserialize(message.getPayload()); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(value); + } + } catch (Exception e) { + LOG.error("Failed to deserialize message, topic: {}, " + "message: {}", Arrays.asList(topics), message); + throw new RuntimeException("Failed to deserialize message", e); + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + mqttState.update(singletonList(MQTT_CLIENT_ID)); + } + + @Override + public void cancel() { + close(); + } + + @Override + public void close() { + try { + queue.clear(); + client.disconnect(); + client.close(); + } catch (Exception e) { + LOG.error("Close mqtt subscriber client failed", e); + } finally { + isRunning = false; + } + } + + @Override + public void connectComplete(boolean reconnect, String serUrl) { + LOG.info("Call connectCoplete method, reconnect:{}, serverUrl:{}.", reconnect, serUrl); + } + + @Override + public void connectionLost(Throwable cause) { + LOG.error("Connection has losted betweenv mqtt subscriber client and broker.", cause); + cause.printStackTrace(); + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + LOG.debug("Message {} has arrived from topic {}.", mqttMessage.getPayload(), topic); + queue.put(mqttMessage); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + throw new IllegalStateException("Method deliveryComplete is for publisher, so should not go to here."); + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } +} diff --git a/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java new file mode 100644 index 00000000..2a1ceafc --- /dev/null +++ b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.mqtt; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import java.io.FileInputStream; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +public class MqttSinkTest { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + Properties properties = new Properties(); + properties.load(new FileInputStream(MqttSinkTest.class.getClassLoader().getResource( + "application.properties").getFile())); + + Properties mqttProperties = new Properties(); + + // mqtt server url = tcp://.messaging.internetofthings.ibmcloud.com:1883 + mqttProperties.setProperty(MqttConfig.SERVER_URL, + String.format("tcp://%s.messaging.internetofthings.ibmcloud.com:1883", properties.getProperty("Org_ID"))); + + // client id = a:: + mqttProperties.setProperty(MqttConfig.CLIENT_ID, + String.format("a:%s:%s", properties.getProperty("Org_ID"), properties.getProperty("App_Id"))); + + mqttProperties.setProperty(MqttConfig.USERNAME, properties.getProperty("API_Key")); + mqttProperties.setProperty(MqttConfig.PASSWORD, properties.getProperty("APP_Authentication_Token")); + + String topic = String.format("iot/type/%s/id/%s/evt/%s/fmt/json", + properties.getProperty("Device_Type"), + properties.getProperty("Device_ID"), + properties.getProperty("EVENT_ID")); + + // Create source that generate number from 1001 to 2000 + RandomSource randomSource = new RandomSource(); + + // Create mqtt sink public data + MqttSink mqttSink = new MqttSink(topic, new SimpleStringSchema(), mqttProperties); + + DataStreamSource source = env.addSource(randomSource); + DataStream mapStream = source.map((MapFunction) value -> + System.currentTimeMillis() + "_" + value); + + mapStream.addSink(mqttSink); + mapStream.print(); + + env.execute("RandomSourceToMqttSink"); + } + + static class RandomSource extends RichSourceFunction { + private volatile boolean isRunning; + private AtomicLong count; + + public RandomSource() { + this.isRunning = true; + this.count = new AtomicLong(1000); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + } + + public void run(SourceContext collect) throws Exception { + while (isRunning) { + if (count.incrementAndGet() % 1000 == 0) { + count.set(1000); + } else { + Thread.sleep(3 * 1000); + } + + collect.collect(count.get() + ""); + } + } + + public void cancel() { + this.isRunning = false; + } + } + +} diff --git a/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java new file mode 100644 index 00000000..2462a91b --- /dev/null +++ b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSourceTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.mqtt; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.FileInputStream; +import java.util.Properties; + +public class MqttSourceTest { + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + Properties properties = new Properties(); + properties.load(new FileInputStream(MqttSourceTest.class.getClassLoader().getResource( + "application.properties").getFile())); + + Properties mqttProperties = new Properties(); + + // mqtt server url = tcp://.messaging.internetofthings.ibmcloud.com:1883 + mqttProperties.setProperty(MqttConfig.SERVER_URL, + String.format("tcp://%s.messaging.internetofthings.ibmcloud.com:1883", properties.getProperty("Org_ID"))); + + // client id = a:: + mqttProperties.setProperty(MqttConfig.CLIENT_ID, + String.format("a:%s:%s", properties.getProperty("Org_ID"), properties.getProperty("App_Id"))); + + mqttProperties.setProperty(MqttConfig.USERNAME, properties.getProperty("API_Key")); + mqttProperties.setProperty(MqttConfig.PASSWORD, properties.getProperty("APP_Authentication_Token")); + + String topic = String.format("iot/type/%s/id/%s/evt/%s/fmt/json", + properties.getProperty("Device_Type"), + properties.getProperty("Device_ID"), + properties.getProperty("EVENT_ID")); + + // Create mqtt source subscribe topic + MqttSource mqttSource = new MqttSource(topic, new SimpleStringSchema(), mqttProperties); + DataStreamSource tempratureDataSource = env.addSource(mqttSource); + DataStream stream = tempratureDataSource.map((MapFunction) s -> + System.currentTimeMillis() + s); + stream.print(); + + env.execute("MqttSourceToPrintSink"); + } + +} diff --git a/flink-connector-mqtt/src/test/resources/application.properties b/flink-connector-mqtt/src/test/resources/application.properties new file mode 100644 index 00000000..d4299fe2 --- /dev/null +++ b/flink-connector-mqtt/src/test/resources/application.properties @@ -0,0 +1,7 @@ +Org_ID=248f64 +Device_Type=TempSensor +Device_ID=RoomTempSensor +App_Id=io_github_pkhanal_DeviceDataAnalysis +API_Key=a-248f64-werrdnw4pd +APP_Authentication_Token=I5SBlIuODWvg@**Q0t +EVENT_ID=temperature diff --git a/flink-connector-mqtt/src/test/resources/log4j.properties b/flink-connector-mqtt/src/test/resources/log4j.properties new file mode 100644 index 00000000..28d81db0 --- /dev/null +++ b/flink-connector-mqtt/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=WARN, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/pom.xml b/pom.xml index 109b5282..28cd2dcc 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ flink-connector-flume flink-connector-influxdb flink-connector-kudu + flink-connector-mqtt flink-connector-netty flink-connector-redis flink-library-siddhi From 6418910ef2eea7c834f4881838c7dfef3f72e7b8 Mon Sep 17 00:00:00 2001 From: Ruguo Date: Mon, 23 Nov 2020 14:39:38 +0800 Subject: [PATCH 2/2] [BAHIR-212] Add MQTT Connector for Flink --- .../connectors/mqtt/MqttSinkTest.java | 2 +- .../src/test/resources/application.properties | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java index 2a1ceafc..4ef53933 100644 --- a/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java +++ b/flink-connector-mqtt/src/test/java/org/apache/flink/streaming/connectors/mqtt/MqttSinkTest.java @@ -76,7 +76,7 @@ static class RandomSource extends RichSourceFunction { private volatile boolean isRunning; private AtomicLong count; - public RandomSource() { + RandomSource() { this.isRunning = true; this.count = new AtomicLong(1000); } diff --git a/flink-connector-mqtt/src/test/resources/application.properties b/flink-connector-mqtt/src/test/resources/application.properties index d4299fe2..1444d846 100644 --- a/flink-connector-mqtt/src/test/resources/application.properties +++ b/flink-connector-mqtt/src/test/resources/application.properties @@ -1,3 +1,21 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + Org_ID=248f64 Device_Type=TempSensor Device_ID=RoomTempSensor