Skip to content

Commit

Permalink
Switch to using an AlarmManager for MQTT keepalives, disable explicit…
Browse files Browse the repository at this point in the history
… ping on JobManager job
  • Loading branch information
growse committed Sep 27, 2021
1 parent 3bd3425 commit ecc1a8f
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Fix crash on startup where shared preferences has some invalid values
* Fix crash on being prompted to enable device location because of a callback not being initialized properly
* Fix crash due to a race when reverse geocoding the contacts
* Improve reliability of MQTT keepalive by using an Android AlarmManager rather than a timer

## Version 2.4.1

Expand Down
1 change: 1 addition & 0 deletions project/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ dependencies {

implementation("org.greenrobot:eventbus:3.2.0")
implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
implementation("org.eclipse.paho:org.eclipse.paho.android.service:1.1.1")

implementation("com.squareup.okhttp3:okhttp:${okHttpVersion}")
implementation("com.squareup.okhttp3:logging-interceptor:${okHttpVersion}")
Expand Down
3 changes: 3 additions & 0 deletions project/app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@
</intent-filter>
</service>

<service android:name="org.eclipse.paho.android.service.MqttService">
</service>

<receiver
android:name=".support.receiver.StartBackgroundServiceReceiver"
android:enabled="true"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.owntracks.android.services;

import static org.owntracks.android.support.RunThingsOnOtherThreads.NETWORK_HANDLER_THREAD_NAME;

import android.content.Context;
import android.content.SharedPreferences;
import android.os.Build;
import android.os.Looper;

import androidx.annotation.WorkerThread;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
Expand Down Expand Up @@ -57,24 +60,22 @@

import timber.log.Timber;

import static org.owntracks.android.support.RunThingsOnOtherThreads.NETWORK_HANDLER_THREAD_NAME;

public class MessageProcessorEndpointMqtt extends MessageProcessorEndpoint implements StatefulServiceMessageProcessor, OnModeChangedPreferenceChangedListener {
public static final int MODE_ID = 0;

private CustomMqttClient mqttClient;
private IMqttAsyncClient mqttClient;

private String lastConnectionId;
private static MessageProcessor.EndpointState state;

private MessageProcessor messageProcessor;
private RunThingsOnOtherThreads runThingsOnOtherThreads;
private Context applicationContext;
private final MessageProcessor messageProcessor;
private final RunThingsOnOtherThreads runThingsOnOtherThreads;
private final Context applicationContext;

private Parser parser;
private Preferences preferences;
private Scheduler scheduler;
private EventBus eventBus;
private final Parser parser;
private final Preferences preferences;
private final Scheduler scheduler;
private final EventBus eventBus;

MessageProcessorEndpointMqtt(MessageProcessor messageProcessor, Parser parser, Preferences preferences, Scheduler scheduler, EventBus eventBus, RunThingsOnOtherThreads runThingsOnOtherThreads, Context applicationContext) {
super(messageProcessor);
Expand All @@ -99,10 +100,6 @@ void reconnectAndSendKeepalive(Semaphore completionNotifier) {
if (!checkConnection()) {
reconnect();
}
if (checkConnection()) {
Timber.d("PING!");
mqttClient.ping();
}
} finally {
if (completionNotifier != null) {
completionNotifier.release();
Expand Down Expand Up @@ -192,7 +189,7 @@ public void messageArrived(String topic, MqttMessage message) {
}
};

private CustomMqttClient buildMqttClient() throws URISyntaxException, MqttException {
private IMqttAsyncClient buildMqttClient() throws URISyntaxException, MqttException {
Timber.d("Initializing new mqttClient");

String scheme = "tcp";
Expand All @@ -211,7 +208,8 @@ private CustomMqttClient buildMqttClient() throws URISyntaxException, MqttExcept
String connectString = new URI(scheme, null, preferences.getHost(), preferences.getPort(), null, null, null).toString();
Timber.d("client id :%s, connect string: %s", cid, connectString);
try {
CustomMqttClient mqttClient = new CustomMqttClient(connectString, cid, new MqttClientMemoryPersistence());

IMqttAsyncClient mqttClient = new MqttAndroidClient(applicationContext, connectString, cid, new MqttClientMemoryPersistence());
mqttClient.setCallback(iCallbackClient);
return mqttClient;
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -265,17 +263,11 @@ private synchronized void connectToBroker() throws MqttConnectionException, Conf

try {
Timber.v("MQTT connecting synchronously");
this.mqttClient.connect(mqttConnectOptions).waitForCompletion();
this.mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
changeState(EndpointState.ERROR.withError(e));
throw new MqttConnectionException(e);
}
Timber.d("MQTT Connected success.");
scheduler.scheduleMqttMaybeReconnectAndPing(mqttConnectOptions.getKeepAliveInterval());

changeState(EndpointState.CONNECTED);

sendMessageConnectPressure = 0; // allow new connection attempts from queueMessageForSending
}

private MqttConnectOptions getMqttConnectOptions() throws MqttConnectionException {
Expand Down Expand Up @@ -381,11 +373,16 @@ private void setWill(MqttConnectOptions m) {
}

private String getConnectionId() {
return String.format("%s/%s", mqttClient.getCurrentServerURI(), mqttClient.getClientId());
return String.format("%s/%s", mqttClient.getServerURI(), mqttClient.getClientId());
}

private void onConnect() {
Timber.d("MQTT connected!. Running onconnect handler (threadID %s)", Thread.currentThread());
scheduler.scheduleMqttMaybeReconnectAndPing(preferences.getKeepalive());

changeState(EndpointState.CONNECTED);

sendMessageConnectPressure = 0; // allow new connection attempts from queueMessageForSending
scheduler.cancelMqttReconnect();
// Check if we're connecting to the same broker that we were already connected to
String connectionId = getConnectionId();
Expand Down Expand Up @@ -451,23 +448,6 @@ private int[] getSubTopicsQos(String[] topics) {
return qos;
}

private void unsubscribe(String[] topics) {
if (!isConnected()) {
Timber.e("subscribe when not connected");
return;
}

for (String s : topics) {
Timber.v("unsubscribe() - Will unsubscribe from: %s", s);
}

try {
mqttClient.unsubscribe(topics);
} catch (Exception e) {
Timber.e(e, "Unable to unsubscribe from topics");
}
}

private void disconnect(boolean fromUser) {
Timber.d("disconnect. Manually triggered? %s. ThreadID: %s", fromUser, Thread.currentThread());
if (isConnecting()) {
Expand Down Expand Up @@ -646,18 +626,6 @@ public boolean containsKey(String key) {
}
}

private static final class CustomMqttClient extends MqttAsyncClient {

CustomMqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
super(serverURI, clientId, persistence);
}

void ping() {
if (comms != null)
comms.checkForActivity();
}
}

@Override
int getModeId() {
return MODE_ID;
Expand Down

0 comments on commit ecc1a8f

Please sign in to comment.