0%

安卓mqtt集成

在 Android 中使用 MQTT(Message Queuing Telemetry Transport) 与服务器交互,是实现物联网、实时通信等场景的常用方式

一、准备工作
在 build.gradle(:app) 中添加:
implementation ‘org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5’
implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’

二、配置权限
在 AndroidManifest.xml 添加:

三、初始化 MQTT 客户端

import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.nio.charset.StandardCharsets;

/**

  • @author xqm

  • @date 2025/7/18 17:01

  • @description MqttManager 类功能说明
    */
    public class MqttManager {
    private static final String TAG = “MqttManager”;
    private static MqttManager instance;
    private final MqttAndroidClient mqttAndroidClient;
    private final MqttConnectOptions connectOptions;

    private String serverUri = “tcp://broker.hivemq.com:1883”; // 可替换为你自己的服务器
    private final String clientId = “AndroidClient_” + System.currentTimeMillis();
    private final Context context;

    private OnMessageReceiveListener onMessageReceiveListener;

    public MqttManager(Context context,String serverUriAdd,String clientIdAdd,String userName,String pwd) {
    this.context = context.getApplicationContext();
    mqttAndroidClient = new MqttAndroidClient(context, serverUriAdd, clientIdAdd);
    connectOptions = new MqttConnectOptions();
    connectOptions.setUserName(userName);
    connectOptions.setPassword(pwd.toCharArray());
    connectOptions.setCleanSession(true);
    connectOptions.setAutomaticReconnect(false);

    initCallback();
    }

    // 单例获取方法
    public static synchronized MqttManager getInstance(Context context, String serverUriAdd, String clientIdAdd, String userName, String pwd) {
    if (instance == null) {
    instance = new MqttManager(context, serverUriAdd, clientIdAdd, userName, pwd);
    }
    return instance;
    }

    // 可选:无参数获取(需提前初始化)
    public static synchronized MqttManager getInstance() {
    if (instance == null) {
    throw new IllegalStateException(“MqttManager 尚未初始化!”);
    }
    return instance;
    }

    public boolean isConnected() {
    return mqttAndroidClient != null && mqttAndroidClient.isConnected();
    }

    private void initCallback() {
    mqttAndroidClient.setCallback(new MqttCallback() {
    @Override
    public void connectionLost(Throwable cause) {
    Log.e(TAG, “连接丢失,准备重连”, cause);
    reconnectWithDelay();
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
    String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
    Log.d(TAG, “收到消息: topic=” + topic + “, message=” + payload);
    if (onMessageReceiveListener != null) {
    onMessageReceiveListener.onMessage(topic, payload);
    }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    Log.d(TAG, “消息发送完成”);
    }
    });
    }

    public void connect() {
    try {
    mqttAndroidClient.connect(connectOptions, null, new IMqttActionListener() {
    @Override
    public void onSuccess(IMqttToken asyncActionToken) {
    Log.d(TAG, “连接成功”);
    }

    @Override
    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    Log.e(TAG, “连接失败”, exception);
    reconnectWithDelay();
    }
    });
    } catch (MqttException e) {
    Log.e(TAG, “连接异常”, e);
    }
    }

    private void reconnectWithDelay() {
    new Handler(Looper.getMainLooper()).postDelayed(this::connect, 3000);
    }

    public void subscribe(String topic) {
    try {
    mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {
    @Override
    public void onSuccess(IMqttToken asyncActionToken) {
    Log.d(TAG, “订阅成功: “ + topic);
    }

    @Override
    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    Log.e(TAG, “订阅失败: “ + topic, exception);
    }
    });
    } catch (MqttException e) {
    Log.e(TAG, “订阅异常”, e);
    }
    }

    public void publish(String topic, String message) {
    try {
    MqttMessage mqttMessage = new MqttMessage();
    mqttMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));
    mqttAndroidClient.publish(topic, mqttMessage);
    Log.d(TAG, “发送消息: topic=” + topic + “, message=” + message);
    } catch (MqttException e) {
    Log.e(TAG, “发送失败”, e);
    }
    }

    public void disconnect() {
    try {
    mqttAndroidClient.disconnect();
    Log.d(TAG, “断开连接”);
    } catch (MqttException e) {
    Log.e(TAG, “断开异常”, e);
    }
    }

    public void setOnMessageReceiveListener(OnMessageReceiveListener listener) {
    this.onMessageReceiveListener = listener;
    }

    public interface OnMessageReceiveListener {
    void onMessage(String topic, String message);
    }

}

四、在 Activity 中使用
mqttManager = MqttManager.getInstance(getApplicationContext(), mqttUrl, clientId, userName, pwd);
mqttManager.setOnMessageReceiveListener((topic, message) -> {
runOnUiThread(() -> {
Toast.makeText(this, “收到: “ + message, Toast.LENGTH_SHORT).show();
});
});
mqttManager.connect();
mqttManager.subscribe(“your/topic”);

订阅多个 Topic(推荐做法)
mqttManager.subscribe(“drone/telemetry”); // 无人机遥测数据
mqttManager.subscribe(“drone/status”); // 状态变化
mqttManager.subscribe(“drone/camera”); // 摄像头控制消息
mqttManager.subscribe(“drone/command”); // 控制指令

也可以封装成数组遍历订阅:
String[] topics = {
“drone/telemetry”,
“drone/status”,
“drone/camera”,
“drone/command”
};

for (String topic : topics) {
mqttManager.subscribe(topic);
}
messageArrived 回调中区分 Topic 内容
mqttManager.setOnMessageReceiveListener((topic, message) -> {
switch (topic) {
case “drone/telemetry”:
// 解析遥测 JSON
break;
case “drone/status”:
// 更新状态 UI
break;
case “drone/command”:
// 响应控制指令
break;
default:
Log.w(“MQTT”, “未知 topic:” + topic);
break;
}
});

扩展建议
如你有“Topic + 消息体 JSON”的组合格式,可以创建实体类并使用 Gson 解析:
TelemetryData data = new Gson().fromJson(message, TelemetryData.class);

五、服务端推荐 MQTT Broker
免费测试:
HiveMQ: tcp://broker.hivemq.com:1883
Eclipse: tcp://iot.eclipse.org:1883
自建服务器推荐:
EMQX
Mosquitto

六、注意事项
保持后台连接:可以考虑 MQTT 放入前台服务中,保证连接不断。
网络断开重连处理:建议实现 connectionLost() 中的自动重连机制。
安全连接:生产环境使用 ssl:// + 用户认证更安全。