EMQX
本文用来设置和运行一个基于 MQTT 协议的客户端程序,利用 Eclipse Paho MQTT 客户端库和 tio-boot 框架进行消息发布和订阅。
Maven 依赖
首先,<dependency>
部分是 Maven 构建配置,用于在你的项目中包含 Eclipse Paho MQTT 客户端库。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
app.properties
eqmx 配置
emqx.broker=tcp://192.168.3.9:1883
emqx.username=username
emqx.password=password
emqx.topic=test/topic
emqx.qos=2
HelloApp 类
这是一个使用 Litongjava Tio 框架启动应用程序的主类。@AComponentScan
注解自动扫描和加载组件。
@AComponentScan
public class HelloApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
TioApplication.run(HelloApp.class, args);
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}
}
SampleCallback 类
这是一个实现了 MqttCallback
接口的类。它提供了三个方法来处理与 MQTT 消息相关的事件:
connectionLost(Throwable cause)
: 当连接丢失时调用。messageArrived(String topic, MqttMessage message)
: 当收到新消息时调用。deliveryComplete(IMqttDeliveryToken token)
: 当消息成功发送后调用。
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SampleCallback implements MqttCallback {
// 连接丢失
public void connectionLost(Throwable cause) {
System.out.println("connection lost:" + cause.getMessage());
}
// 收到消息
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Received message: \n topic:" + topic + "\n Qos:" + message.getQos() + "\n payload:"
+ new String(message.getPayload()));
}
// 消息传递成功
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete");
}
}
MqttClientUtils 类
这个工具类用于简化 MQTT 客户端的操作。它提供了保存客户端、发布消息等功能。
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttClientUtils {
private static MqttClient client;
private static String topic;
private static int qos;
public static void init(MqttClient client, String topic, int qos) {
MqttClientUtils.client = client;
MqttClientUtils.topic = topic;
MqttClientUtils.qos = qos;
}
public static MqttClient getClient() {
return client;
}
public static void publish(MqttMessage message) {
message.setQos(qos);
try {
client.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void publishWithQos(MqttMessage message) {
try {
client.publish(topic, message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
EmqxClientConfig 类
这是一个配置类,用于初始化 MQTT 客户端和连接设置。它使用 EnvUtils
从环境或配置文件中获取配置,并初始化一个 MQTT 客户端实例。该实例配置了连接选项、回调处理以及订阅的 topic。此外,还有一个销毁方法,用于在应用程序关闭时正确地断开 MQTT 客户端的连接。
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.web.hello.utils.MqttClientUtils;
import lombok.extern.slf4j.Slf4j;
@AConfiguration
@Slf4j
public class EmqxClientConfig {
@Initialization
public void config() {
String broker = EnvUtils.get("emqx.broker");
String username = EnvUtils.get("emqx.username");
String password = EnvUtils.get("emqx.password");
String topic = EnvUtils.get("emqx.topic");
int qos = EnvUtils.getInt("emqx.qos", 2);
String clientId = MqttClient.generateClientId();
// 持久化
MemoryPersistence persistence = new MemoryPersistence();
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
// 设置认证信息
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// 设置回调
client.setCallback(new SampleCallback());
// 建立连接
log.info("Connecting to broker: " + broker);
client.connect(connOpts);
log.info("Connected to broker: " + broker);
// 订阅 topic
client.subscribe(topic, qos);
log.info("Subscribed to topic: " + topic);
TioBootServer.me().addDestroyMethod(() -> {
try {
client.disconnect();
log.info("Disconnected");
client.close();
} catch (MqttException e) {
e.printStackTrace();
}
});
MqttClientUtils.init(client,topic,qos);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
me.printStackTrace();
}
}
}
IndexController 类
这是一个简单的控制器,用于处理 HTTP 请求。当访问应用程序的根路径时,它会发布一个 "Hello World" 消息到 MQTT 服务器。
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.litongjava.annotation.RequestPath;
import com.litongjava.tio.web.hello.utils.MqttClientUtils;
@RequestPath("/")
public class IndexController {
@RequestPath()
public String index() {
String content = "Hello World";
MqttMessage message = new MqttMessage(content.getBytes());
MqttClientUtils.publish(message);
return "index";
}
}
发布消息
使用 MqttClientUtils
类来发布消息到 MQTT 服务器。
// 发布消息
MqttClient client = MqttClientUtils.getClient();
String content = "Hello World";
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
log.info("Message published");
主要涉及使用 Eclipse Paho MQTT 客户端库进行消息发布和订阅,以及使用 tio-boot 框架构建的 Web 应用程序。它演示了如何配置 MQTT 客户端,处理连接丢失、消息到达和消息传递完成的事件,发布和订阅消息,并通过 HTTP 接口触发消息发布。