Android-MQTT通信

添加依赖

1
2
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

添加权限

1
2
3
4
5
6
7
8
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application
...
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

创建MQTT管理器

  创建一个MQTTManager类,并添加一个方法使用单例模式获取实例

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MQTTManager {
private static MQTTManager mqttManager;

/**
* 通过单例模式获取对象
*/
public synchronized static MQTTManager getInstance() {
if (mqttManager == null) {
mqttManager = new MQTTManager();
}
return mqttManager;
}
}

初始化MQTT客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// MQTT地址 = host + port + path
private final String serverUri = "ws://127.0.0.1:8080/mqtt";

private final String userName = "admin"; // 用户名
private final String password = "admin"; // 密码

private MqttAndroidClient mqttAndroidClient; // 是对MqttClient进行的再封装拓展类
private MqttConnectOptions mqttConnectOptions;// MQTT连接配置
/**
* 初始化
* @param context 上下文
*/
public void init(Context context) {
// 客户端ID,这里是一个自定义方法,获取手机的IMEI码
String clientId = PhoneInfoUtils.getImei(context);

// MQTT连接配置
mqttConnectOptions = new MqttConnectOptions();
// 清除缓存
mqttConnectOptions.setCleanSession(true);
// 超时时间
mqttConnectOptions.setConnectionTimeout(5);
// 心跳包发送间隔
mqttConnectOptions.setKeepAliveInterval(5);
// 连接丢失后是否自动连接
mqttConnectOptions.setAutomaticReconnect(true);
// 用户名
mqttConnectOptions.setUserName(userName);
// 密码
mqttConnectOptions.setPassword(password.toCharArray());
// 设置遗言
String willContent = "{ \"data\": { \"Carname\":\"\", \"pic\":\"\", \"type\":\"1\" }, \"msg\":\"网络不佳,设备掉线\"}";
mqttConnectOptions.setWill(topic_send, willContent.getBytes(), 2, true);

// MQTT客户端
mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId);
// 设置MQTT连接回调
mqttAndroidClient.setCallback(new MyMqttCallback());
}

/**
* MQTT连接回调
*/
private static class MyMqttCallback implements MqttCallback {

@Override
public void connectionLost(Throwable cause) {
cause.printStackTrace();
LogUtils.d(getClass(), "连接断开 = " + cause);

// 连接断开后重新建立连接
// 这里可以添加一些条件,比如检查网络是否稳定
MQTTManager.getInstance().connect();
}

@Override
public void messageArrived(String topic, MqttMessage message) {
LogUtils.d(getClass(), "主题 = " + topic);
LogUtils.d(getClass(), "收到消息 = " + new String(message.getPayload()));
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
LogUtils.d(getClass(), "消息已到达");
}
}

连接与断开连接MQTT

  订阅所使用的方法在下面会写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

public static final String topic_read = "/photograph";// 预设的主题,用于接收
public static final String topic_send = "/photo"; // 预设的主题,用于发送

/**
* 连接mqtt
*/
public void connect() {
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogUtils.d(getClass(), "MQTT连接成功");
// 连接成功后可以订阅一个主题,以接收该主题中的消息
subscribe(topic_read, 2);
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
LogUtils.d(getClass(), "MQTT连接失败 = " + exception);
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}


/**
* 断开连接
*/
public void disconnect() {
try {
mqttAndroidClient.disconnect(null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogUtils.d(getClass(), "断开连接成功");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
LogUtils.d(getClass(), "断开连接失败 = " + exception);
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}

订阅与取消订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 订阅主题
* @param topic 要订阅的目标主题名
* @param qos 0 – 最多发一次; 1 – 最少发一次; 2 – 保证收一次
*/
public void subscribe(String topic, int qos) {
try {
mqttAndroidClient.subscribe(topic, qos, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogUtils.d(getClass(), "MQTT订阅成功");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogUtils.d(getClass(), "MQTT订阅失败");
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消订阅主题
* @param topic 取消订阅的目标主题名
*/
public void unsubscribe(String topic) {
try {
mqttAndroidClient.unsubscribe(topic, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogUtils.d(getClass(), "MQTT取消订阅成功");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
LogUtils.d(getClass(), "MQTT取消订阅失败 = " + exception);
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}

发布主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 在主题中发布消息
* @param topic 目标主题
* @param msg 内容消息
* @param qos 0 – 最多发一次; 1 – 最少发一次; 2 – 保证收一次
* @param retained 设置保留
*/
public void publish (String topic, String msg, int qos, boolean retained) {
try {
MqttMessage message = new MqttMessage();

if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT) {
message.setPayload(msg.getBytes(StandardCharsets.UTF_8));
} else {
message.setPayload(msg.getBytes());
}

message.setQos(qos);
message.setRetained(retained);

// 推送消息
mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogUtils.d(getClass(), "发送消息成功");
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
LogUtils.d(getClass(), "发送消息失败 = " + exception);
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}

使用

  创建一个服务,服务绑定时先初始化MQTT管理器,然后通过connect()方法连接MQTT服务器,连接成功后会自动接收预设的订阅主题的消息

1
2
3
4
5
6
@Override
public void onCreate() {
super.onCreate();
MQTTManager.getInstance().init(this);
MQTTManager.getInstance().connect();
}

  服务销毁时时随即断开MQTT连接

1
2
3
4
5
6
@Override
public void onDestroy() {
super.onDestroy();
// 销毁服务时断开MQTT连接
MQTTManager.getInstance().disconnect();
}

  发布消息

1
MQTTManager.getInstance().publish(MQTTManager.topic_send, "测试消息", 2, true);

参考:
https://www.emqx.com/zh/blog/android-connects-mqtt-using-kotlin
https://blog.csdn.net/u014084081/article/details/121165147