Skip to content

liujson/mqtt-client-ui

Repository files navigation

MQTT.Cli

MQTT.Cli 是一个模仿 MQTT.fx 的MQTT 客户端应用。MQTT.Cli 可以用来测试MQTT连接和通信。**libmqtt **是为了解决之前项目异常掉线和为了通信稳定发展来的。**libmqtt **只能在Android平台上使用。

libmqtt 使用

引入依赖

dependencies {
    //直接引入aar方式 (也可以用依赖库的方式引入)
    implementation (name: 'libmqtt-{version}', ext: 'aar')
    
    //paho client mqttv3 (必须)
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
}

使用同步客户端

Paho提供了同步堵塞的 MqttClient ,一般不会直接使用这种方式,如果确有相关需求可直接使用。下面是官方的Demo示例。

    public static void main(String[] args) {

        String topic        = "MQTT Examples";
        String content      = "Message from MqttPublishSample";
        int qos             = 2;
        String broker       = "tcp://iot.eclipse.org:1883";
        String clientId     = "JavaSample";
        //使用内存持久化
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            //Mqtt 连接参数配置
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "+broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: "+content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        }
    }

使用异步客户端

如果你阅读源码可以发现 c是基于 MqttAsyncClient 封装的,也就是意味着学会了MqttAsyncClient 就学会了使用整个 Paho Java Client。下面详细介绍如何使用 MqttAsyncClient。

setp 1 构造 MqttAsyncClient

MqttAsyncClient 提供了5个构造函数,我们一般只需要使用前两个就行了

public MqttAsyncClient(String serverURI, String clientId) throws MqttException

public PahoMqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException
  • 第一个参数:服务端URI接收String类型的参数但只支持ssl://tcp://开头的地址,如果不填端口号默认tcp是1883,ssl是8883
  • 第二个参数:clientId 不填会报错, clientId是用来服务端确定唯一的Client标识;
  • 第二个参数:MqttClientPersistence 文档上说是用来持久化出入站消息的。
setp 2 配置连接参数

TODO //

setp 3 监听消息接收

TODO //

setp 4 订阅主题

TODO //

setp 5 发送消息

TODO //

使用Rx封装客户端

RxPahoClient 是基于 MqttAsyncClient + MqttAsyncClient 封装的RxJava客户端,带有异步和同步的调用方式。使用方式和MqttAsyncClient是很相似的。

以下是伪代码:

RxPahoClient client = new RxPahoClient(createConnectionParams());
//创建之后就可以使用Rx方式调用了

//添加监听
client.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable cause) {
		//连接丢失
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
		//消息抵达
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
		//发送完成
    }
})

//连接
client.connect().observeOn(AndroidSchedulers.mainThread()).subscribe();
client.connectWithResult().waitForCompletion();
//订阅
client.subscribe("topicName", QoS.AT_MOST_ONCE).observeOn(AndroidSchedulers.mainThread()).subscribe();
client.subscribeWithResponse(new String[]{"topicName"}, new QoS[]{QoS.AT_MOST_ONCE}).waitForCompletion(10000);
//取消订阅
client.unsubscribe("topicName").observeOn(AndroidSchedulers.mainThread()).subscribe();
//发布
client.publish("topicName","Hello".getBytes(),QoS.AT_MOST_ONCE,false).observeOn(AndroidSchedulers.mainThread()).subscribe();
//断开连接
client.disconnect().observeOn(AndroidSchedulers.mainThread()).subscribe();
//安全关闭,关闭成功后会释放资源(建议使用)
client.closeSafety().observeOn(AndroidSchedulers.mainThread()).subscribe();
//强制关闭,关闭成功后会释放资源,强制关闭会堵塞调用线程(建议优先使用disconnect,当不能关闭时再使用强制关闭)
client.disconnectForcibly(100,200);
//是否连接
client.isConnected();
//是否关闭(关闭后该RxPahoClient就不能使用了,需要重新创建)
client.isClosed();

【注意】:RxPahoClient 如果想要自动重连请先打开参数automaticReconnect

【注意】:首次连接失败不会自动重连,就算设置了automaticReconnect也不会重连

在后台运行客户端(Service)

后台运行例子:后台运行意味着需要有后台组件,让RxRxPahoClient 依赖于后台组件生命周期运行。

ConnectionService.java

public class ConnectionService extends Service {

    private static final String TAG = "ConnectionService";

    private final ConnectionBinder binder = new ConnectionBinder();


    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        //直接通过绑定启动后台服务很快就会被系统杀死
        LogX.d(TAG, "==onBind==");
        return binder;
    }


    @Override
    public void onCreate() {
        super.onCreate();
        LogX.d(TAG, "==onCreate==");
    }


    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        LogX.d(TAG, "==onStartCommand==");
        return super.onStartCommand(intent, flags, startId);
    }


    @Override
    public void onDestroy() {
        super.onDestroy();
        LogX.d(TAG, "==onDestroy==");
        if (binder.isInstalled()) {
            //尝试关闭并释放资源
            final Disposable rxCloseSafety = binder.getClient()
                    .closeForcibly(500, 1000)
                    .subscribe(() -> {
                        LogX.d(TAG, "==disconnectForcibly success==");
                    }, throwable -> {
                        LogX.d(TAG, "==disconnectForcibly failure:" + throwable.toString());
                    });
        }
    }
}

ConnectionBinder.java

public class ConnectionBinder extends AbstractPahoConnServiceBinder {

    private static final String TAG = "ConnectionBinder";

    final List<OnRecMsgListener> recMsgListenerList = new ArrayList<>();
    final List<OnConnectedListener> connectedListenerList = new ArrayList<>();

    Handler mHandler = new Handler(Looper.myLooper());

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        LogX.d(TAG,"MQTT 消息抵达,topic:" + topic + ",message:" + new String(message.getPayload()));
        mHandler.post(() -> {
            //接受到消息会回调这里
            final Iterator<OnRecMsgListener> it = recMsgListenerList.iterator();
            while (it.hasNext()) {
                it.next().onReceiveMessage(topic, message.getPayload(), MqttUtils.int2QoS(message.getQos()));
            }
        });
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        LogX.d(TAG,"MQTT 连接完成,是否重连:" + reconnect + ",server uri:" + serverURI);
        if (reconnect) {
            // 如果cleanSession是true,重连后需要重新订阅topic
        }
        final Iterator<OnConnectedListener> it = connectedListenerList.iterator();
        while (it.hasNext()) {
            it.next().onConnectComplete(reconnect, serverURI);
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        super.connectionLost(cause);
        LogX.e(TAG,"MQTT 失去连接:" + cause.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        super.deliveryComplete(token);
        try {
            LogX.d(TAG,"MQTT 发布成功,topic:" + Arrays.toString(token.getTopics()) +
                    ",message:" + new String(token.getMessage().getPayload()));
        } catch (Exception e) {
            LogX.d(TAG,"MQTT 消息发布成功");
        }
    }


    public void addOnRecMsgListener(ConnectionBinder.OnRecMsgListener recMsgListener) {
        if (!recMsgListenerList.contains(recMsgListener)) {
            recMsgListenerList.add(recMsgListener);
        }
    }

    public void removeOnRecMsgListener(ConnectionBinder.OnRecMsgListener recMsgListener) {
        recMsgListenerList.remove(recMsgListener);
    }

    public void addOnConnectedListener(ConnectionBinder.OnConnectedListener connectedListener) {
        if (!connectedListenerList.contains(connectedListener)) {
            connectedListenerList.add(connectedListener);
        }
    }

    public void removeOnConnectedListener(ConnectionBinder.OnConnectedListener connectedListener) {
        connectedListenerList.remove(connectedListener);
    }

    //-------------------------------------------------------------------------------------------

    public interface OnRecMsgListener {
        /**
         * 接收到消息
         */
        @MainThread
        void onReceiveMessage(String topic, byte[] payload, QoS qoS);
    }

    public interface OnConnectedListener {
        /**
         * 连接成功
         */
        @MainThread
        void onConnectComplete(boolean reconnect, String serverURI);
    }

    //-------------------------------------------------------------------------------------------
}

DemoActivity 中使用

以下是伪代码:

private ConnectionBinder mBinder;   

private final ServiceConnection mServiceConnection = new ServiceConnection() {
        @Override
        public void onServiceConnected(ComponentName name, IBinder service) {
            mBinder = (ConnectionBinder) service;
            ToastUtils.showToast("MQTT后台服务绑定成功");
            
            install(createRxPahoClient());
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {
            mBinder = null;
            ToastUtils.showToast("MQTT后台服务绑定失败");
        }
    };

public ConnectionBinder binder() {
    return mBinder;
}

//是否绑定服务
public boolean isBind() {
    return binder() != null;
}
//是否安装RxPahoClient 到服务
public boolean isInstalled() {
    return binder().isInstalled();
}
//比对已经安装的RxPahoClient的参数和object连接对象参数
public boolean isSame(Object object) {
    return binder().isSame(object);
}
//是否已经连接成功
public boolean isConnected() {
    return isInstalled() && binder().getClient().isConnected();
}
//是否已经关闭
public boolean isClosed() {
    return binder().getClient().isClosed();
}
//安装
public void install(RxPahoClient client) {
    binder().install(client);
}
//卸载
public void uninstall() {
    binder().uninstall();
}
//获取已经订阅的主题列表
public List<Pair<String, QoS>> getSubList() {
    return binder().getClient().getActiveSubs();
}
//添加消息接收监听
public void addOnRecMsgListener(ConnectionBinder.OnRecMsgListener recMsgListener) {
    binder().addOnRecMsgListener(recMsgListener);
}
//移除消息接收监听
public void removeOnRecMsgListener(ConnectionBinder.OnRecMsgListener recMsgListener) {
    binder().removeOnRecMsgListener(recMsgListener);
}
//添加连接成功监听
public void addOnConnectedListener(ConnectionBinder.OnConnectedListener connectedListener) {
    binder().addOnConnectedListener(connectedListener);
}
//移除连接成功监听
public void removeOnConnectedListener(ConnectionBinder.OnConnectedListener connectedListener) {
    binder().removeOnConnectedListener(connectedListener);
}


@Override
protected void onCreate(Bundle savedInstanceState) {
	......
   //绑定服务,绑定成功后说明服务起来了
   bindService(this, mServiceConnection,Context.BIND_AUTO_CREATE);
}

ConnectionParams 连接参数说明

ConnectionParams.newBuilder(String serverURI)
    			//【必填】支持两种类型的连接{tcp://localhost:1883,ssl://localhost:8883}如果没有指定端口tcp:// 默认使用1883 ,ssl:// 默认 8883
               .serverURI()
    			//【必填】它会覆盖serverURI,客户端连接时会从第一个开始尝试,直到连接成功,或者所有都失败;如果其中地址指向不同的服务器,cleanSession应当设置为false
    		   .setServerURIs(String[] serverURIs)
    			//如果你不想服务端记住你对话请设置为true,默认是true
               .cleanSession(boolean cleanSession)
    			//用户名,默认无
               .username(String username)
    			//密码,默认无
     		   .password(String password)
    			//设置遗嘱消息,默认无
               .setWill(@NonNull String willTopic, @NonNull Message willMessage)
               //设置“保持存活”间隔(秒)。客户端与服务端ping消息的时间间隔,单位是秒,设置为0时表示禁用,默认15秒
               .keepAlive(int keepAlive)
               //【必填】 clientId是用来服务端确定唯一的Client标识 不允许为空或者字符长度超过65535
               .clientId(String clientId)
    		   //连接超时间(单位秒),默认10秒
               .connectionTimeout(int connectionTimeout)
    			//重连的最大等待时间(单位ms)当设置了断线自动重连(automaticReconnect)后,会每个一段时间进行自动重连,如果连不上则进行延时后再尝试(延时算法:指数补偿),当达到最大时延(maxReconnectDelay)后这个过程会重新来,直到又达到最大时延,如此周期。默认 128000
               .maxReconnectDelay(int maxReconnectDelay)
    			//自动重连(默认false)
               .automaticReconnect(boolean automaticReconnect)
   				// 设置MQTT版本。默认操作是与版本 3.1.1 连接,如果失败,则返回到 3.1。版本 3.1.1 或 3.1 可以分别使用MQTT_VERSION_3_1_1或MQTT_VERSION_3_1选项来具体选择,无需回退。
               .mqttVersion(int mqttVersion)
    			//设置SocketFactory
    		   .socketFactory(SocketFactory socketFactory)
    			//设置 Hostname 验证
    			.sslHostnameVerifier(HostnameVerifier sslHostnameVerifier)
               .build();

SSL支持

参考 SSLDemoActivity.java

可能出现的疑难杂症和解决方案

TODO //

重连和首次启动连接异常处理方案

TODO //

其他控件使用

AVLoadingIndicatorView 使用

img

Indicator table

Col 1 Col 2 Col 3 Col 4
BallPulseIndicator BallGridPulseIndicator BallClipRotateIndicator BallClipRotatePulseIndicator
SquareSpinIndicator BallClipRotateMultipleIndicator BallPulseRiseIndicator BallRotateIndicator
CubeTransitionIndicator BallZigZagIndicator BallZigZagDeflectIndicator BallTrianglePathIndicator
BallScaleIndicator LineScaleIndicator LineScalePartyIndicator BallScaleMultipleIndicator
BallPulseSyncIndicator BallBeatIndicator LineScalePulseOutIndicator LineScalePulseOutRapidIndicator
BallScaleRippleIndicator BallScaleRippleMultipleIndicator BallSpinFadeLoaderIndicator LineSpinFadeLoaderIndicator
TriangleSkewSpinIndicator PacmanIndicator BallGridBeatIndicator SemiCircleSpinIndicator

开源使用和参考文档

About

MQTT Client

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages