添加依赖
1 2
| implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
|
添加权限
1 2 3 4 5 6 7 8
| <uses-permission android:name="android.permission.INTERNET" /> <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application ... <service android:name="org.eclipse.paho.android.service.MqttService" /> </application>
|
创建MQTT管理器
创建一个MQTTManager类,并添加一个方法使用单例模式获取实例
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class MQTTManager { private static MQTTManager mqttManager;
public synchronized static MQTTManager getInstance() { if (mqttManager == null) { mqttManager = new MQTTManager(); } return mqttManager; } }
|
初始化MQTT客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| private final String serverUri = "ws://127.0.0.1:8080/mqtt";
private final String userName = "admin"; private final String password = "admin";
private MqttAndroidClient mqttAndroidClient; private MqttConnectOptions mqttConnectOptions;
public void init(Context context) { String clientId = PhoneInfoUtils.getImei(context);
mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(5); mqttConnectOptions.setKeepAliveInterval(5); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(userName); mqttConnectOptions.setPassword(password.toCharArray()); String willContent = "{ \"data\": { \"Carname\":\"\", \"pic\":\"\", \"type\":\"1\" }, \"msg\":\"网络不佳,设备掉线\"}"; mqttConnectOptions.setWill(topic_send, willContent.getBytes(), 2, true);
mqttAndroidClient = new MqttAndroidClient(context, serverUri, clientId); mqttAndroidClient.setCallback(new MyMqttCallback()); }
private static class MyMqttCallback implements MqttCallback {
@Override public void connectionLost(Throwable cause) { cause.printStackTrace(); LogUtils.d(getClass(), "连接断开 = " + cause);
MQTTManager.getInstance().connect(); }
@Override public void messageArrived(String topic, MqttMessage message) { LogUtils.d(getClass(), "主题 = " + topic); LogUtils.d(getClass(), "收到消息 = " + new String(message.getPayload())); }
@Override public void deliveryComplete(IMqttDeliveryToken token) { LogUtils.d(getClass(), "消息已到达"); } }
|
连接与断开连接MQTT
订阅所使用的方法在下面会写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public static final String topic_read = "/photograph"; public static final String topic_send = "/photo";
public void connect() { try { mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(getClass(), "MQTT连接成功"); subscribe(topic_read, 2); }
@Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { exception.printStackTrace(); LogUtils.d(getClass(), "MQTT连接失败 = " + exception); } }); } catch (MqttException e) { e.printStackTrace(); } }
public void disconnect() { try { mqttAndroidClient.disconnect(null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(getClass(), "断开连接成功"); }
@Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { exception.printStackTrace(); LogUtils.d(getClass(), "断开连接失败 = " + exception); } }); } catch (MqttException e) { e.printStackTrace(); } }
|
订阅与取消订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public void subscribe(String topic, int qos) { try { mqttAndroidClient.subscribe(topic, qos, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(getClass(), "MQTT订阅成功"); }
@Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtils.d(getClass(), "MQTT订阅失败"); } }); } catch (MqttException e) { e.printStackTrace(); } }
public void unsubscribe(String topic) { try { mqttAndroidClient.unsubscribe(topic, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(getClass(), "MQTT取消订阅成功"); }
@Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { exception.printStackTrace(); LogUtils.d(getClass(), "MQTT取消订阅失败 = " + exception); } }); } catch (MqttException e) { e.printStackTrace(); } }
|
发布主题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
|
public void publish (String topic, String msg, int qos, boolean retained) { try { MqttMessage message = new MqttMessage();
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT) { message.setPayload(msg.getBytes(StandardCharsets.UTF_8)); } else { message.setPayload(msg.getBytes()); }
message.setQos(qos); message.setRetained(retained);
mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(getClass(), "发送消息成功"); }
@Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { exception.printStackTrace(); LogUtils.d(getClass(), "发送消息失败 = " + exception); } }); } catch (MqttException e) { e.printStackTrace(); } }
|
使用
创建一个服务,服务绑定时先初始化MQTT管理器,然后通过connect()方法连接MQTT服务器,连接成功后会自动接收预设的订阅主题的消息
1 2 3 4 5 6
| @Override public void onCreate() { super.onCreate(); MQTTManager.getInstance().init(this); MQTTManager.getInstance().connect(); }
|
服务销毁时时随即断开MQTT连接
1 2 3 4 5 6
| @Override public void onDestroy() { super.onDestroy(); MQTTManager.getInstance().disconnect(); }
|
发布消息
1
| MQTTManager.getInstance().publish(MQTTManager.topic_send, "测试消息", 2, true);
|
参考:
https://www.emqx.com/zh/blog/android-connects-mqtt-using-kotlin
https://blog.csdn.net/u014084081/article/details/121165147