在 Android 中使用 MQTT(Message Queuing Telemetry Transport) 与服务器交互,是实现物联网、实时通信等场景的常用方式
一、准备工作
在 build.gradle(:app) 中添加:
implementation ‘org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5’
implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’
二、配置权限
在 AndroidManifest.xml 添加:
三、初始化 MQTT 客户端
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.nio.charset.StandardCharsets;
/**
@author xqm
@date 2025/7/18 17:01
@description MqttManager 类功能说明
*/
public class MqttManager {
private static final String TAG = “MqttManager”;
private static MqttManager instance;
private final MqttAndroidClient mqttAndroidClient;
private final MqttConnectOptions connectOptions;private String serverUri = “tcp://broker.hivemq.com:1883”; // 可替换为你自己的服务器
private final String clientId = “AndroidClient_” + System.currentTimeMillis();
private final Context context;private OnMessageReceiveListener onMessageReceiveListener;
public MqttManager(Context context,String serverUriAdd,String clientIdAdd,String userName,String pwd) {
this.context = context.getApplicationContext();
mqttAndroidClient = new MqttAndroidClient(context, serverUriAdd, clientIdAdd);
connectOptions = new MqttConnectOptions();
connectOptions.setUserName(userName);
connectOptions.setPassword(pwd.toCharArray());
connectOptions.setCleanSession(true);
connectOptions.setAutomaticReconnect(false);
initCallback();
}// 单例获取方法
public static synchronized MqttManager getInstance(Context context, String serverUriAdd, String clientIdAdd, String userName, String pwd) {
if (instance == null) {
instance = new MqttManager(context, serverUriAdd, clientIdAdd, userName, pwd);
}
return instance;
}// 可选:无参数获取(需提前初始化)
public static synchronized MqttManager getInstance() {
if (instance == null) {
throw new IllegalStateException(“MqttManager 尚未初始化!”);
}
return instance;
}public boolean isConnected() {
return mqttAndroidClient != null && mqttAndroidClient.isConnected();
}private void initCallback() {
mqttAndroidClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.e(TAG, “连接丢失,准备重连”, cause);
reconnectWithDelay();
}
@Override
public void messageArrived(String topic, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
Log.d(TAG, “收到消息: topic=” + topic + “, message=” + payload);
if (onMessageReceiveListener != null) {
onMessageReceiveListener.onMessage(topic, payload);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.d(TAG, “消息发送完成”);
}
});
}public void connect() {
try {
mqttAndroidClient.connect(connectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, “连接成功”);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, “连接失败”, exception);
reconnectWithDelay();
}
});
} catch (MqttException e) {
Log.e(TAG, “连接异常”, e);
}
}private void reconnectWithDelay() {
new Handler(Looper.getMainLooper()).postDelayed(this::connect, 3000);
}public void subscribe(String topic) {
try {
mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, “订阅成功: “ + topic);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, “订阅失败: “ + topic, exception);
}
});
} catch (MqttException e) {
Log.e(TAG, “订阅异常”, e);
}
}public void publish(String topic, String message) {
try {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));
mqttAndroidClient.publish(topic, mqttMessage);
Log.d(TAG, “发送消息: topic=” + topic + “, message=” + message);
} catch (MqttException e) {
Log.e(TAG, “发送失败”, e);
}
}public void disconnect() {
try {
mqttAndroidClient.disconnect();
Log.d(TAG, “断开连接”);
} catch (MqttException e) {
Log.e(TAG, “断开异常”, e);
}
}public void setOnMessageReceiveListener(OnMessageReceiveListener listener) {
this.onMessageReceiveListener = listener;
}public interface OnMessageReceiveListener {
void onMessage(String topic, String message);
}
}
四、在 Activity 中使用
mqttManager = MqttManager.getInstance(getApplicationContext(), mqttUrl, clientId, userName, pwd);
mqttManager.setOnMessageReceiveListener((topic, message) -> {
runOnUiThread(() -> {
Toast.makeText(this, “收到: “ + message, Toast.LENGTH_SHORT).show();
});
});
mqttManager.connect();
mqttManager.subscribe(“your/topic”);
订阅多个 Topic(推荐做法)
mqttManager.subscribe(“drone/telemetry”); // 无人机遥测数据
mqttManager.subscribe(“drone/status”); // 状态变化
mqttManager.subscribe(“drone/camera”); // 摄像头控制消息
mqttManager.subscribe(“drone/command”); // 控制指令
也可以封装成数组遍历订阅:
String[] topics = {
“drone/telemetry”,
“drone/status”,
“drone/camera”,
“drone/command”
};
for (String topic : topics) {
mqttManager.subscribe(topic);
}
messageArrived 回调中区分 Topic 内容
mqttManager.setOnMessageReceiveListener((topic, message) -> {
switch (topic) {
case “drone/telemetry”:
// 解析遥测 JSON
break;
case “drone/status”:
// 更新状态 UI
break;
case “drone/command”:
// 响应控制指令
break;
default:
Log.w(“MQTT”, “未知 topic:” + topic);
break;
}
});
扩展建议
如你有“Topic + 消息体 JSON”的组合格式,可以创建实体类并使用 Gson 解析:
TelemetryData data = new Gson().fromJson(message, TelemetryData.class);
五、服务端推荐 MQTT Broker
免费测试:
HiveMQ: tcp://broker.hivemq.com:1883
Eclipse: tcp://iot.eclipse.org:1883
自建服务器推荐:
EMQX
Mosquitto
六、注意事项
保持后台连接:可以考虑 MQTT 放入前台服务中,保证连接不断。
网络断开重连处理:建议实现 connectionLost() 中的自动重连机制。
安全连接:生产环境使用 ssl:// + 用户认证更安全。