Mica-mqtt
添加依赖
<mica-mqtt.version>2.2.6</mica-mqtt.version>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-client</artifactId>
<version>${mica-mqtt.version}</version>
</dependency>
package com.litongjava.mica.mqtt.client.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import net.dreamlu.iot.mqtt.core.client.IMqttClientConnectListener;
/**
* Client Connection Status Listening
*/
public class MqttClientConnectListener implements IMqttClientConnectListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);
@Override
public void onConnected(ChannelContext context, boolean isReconnect) {
if (isReconnect) {
logger.info("Reconnect mqtt server reconnected successfully");
} else {
logger.info("Connection to mqtt server successful");
}
}
@Override
public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
logger.error("mqtt link broken remark:{} isRemove:{}", remark, isRemove, throwable);
}
}
package com.litongjava.mica.mqtt.client.config;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import org.tio.core.ChannelContext;
import com.litongjava.jfinal.aop.annotation.Bean;
import com.litongjava.jfinal.aop.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import net.dreamlu.iot.mqtt.core.client.MqttClientCreator;
@AConfiguration
@Slf4j
public class MicaMQTTClientConfig {
@ABean
public MqttClient MqttClient() {
// 初始化 mqtt 客户端
MqttClientCreator creator = MqttClient.create().ip("192.168.3.9").port(1883).username("mica").password("mica");
// 连接监听
MqttClient client = creator.connectListener(new MqttClientConnectListener()).willMessage(builder -> {
builder.topic("/test/offline").messageText("down").retain(false).qos(MqttQoS.AT_MOST_ONCE); // 遗嘱消息
})
// 同步连接,也可以使用 connect() 异步(可以避免 broker 没启动照成启动卡住),但是下面的订阅和发布可能还没连接成功。
.connectSync();
// 订阅
client.subQos0("/test/123", new IMqttClientMessageListener() {
@Override
public void onSubscribed(ChannelContext context, String topicFilter, MqttQoS mqttQoS) {
// 订阅成功之后触发,可在此处做一些业务逻辑
log.info("topicFilter:{} MqttQoS:{} Subscription successful", topicFilter, mqttQoS);
}
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
log.info(topic + '\t' + new String(payload, StandardCharsets.UTF_8));
}
});
// 发送
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
client.publish("/test/client", "hello this is mica client".getBytes(StandardCharsets.UTF_8));
}
}, 1000, 2000);
return client;
}
}
这段代码主要涉及两个 Java 类,它们是用于配置和实现 MQTT 客户端的功能。让我们逐个进行解释:
MqttClientConnectListener 类
这个类实现了 IMqttClientConnectListener
接口,用于监听 MQTT 客户端的连接状态。
- 方法:
onConnected
: 当客户端成功连接到 MQTT 服务器时调用。如果是重新连接(isReconnect
为true
),则记录“重连成功”的消息;否则,记录“连接成功”的消息。onDisconnect
: 当客户端与 MQTT 服务器的连接断开时调用。记录断开连接的详细信息和错误(如果有)。
MicaMQTTClientConfig 类
这个类使用了 JFinal 的 @AConfiguration
和 @ABean
注解,表明它是一个配置类,用于初始化和配置 MQTT 客户端。
- 方法:
MqttClient
: 定义了一个MqttClient
Bean。在这个方法中,执行了以下操作:- 初始化 MQTT 客户端:使用
MqttClient.create()
创建一个 MQTT 客户端实例,并设置了服务器的 IP 地址、端口、用户名和密码。 - 设置连接监听器:添加了前面定义的
MqttClientConnectListener
实例作为连接监听器。 - 配置遗嘱消息:设置了客户端的遗嘱消息,当客户端意外断开连接时,服务器将会发布这条消息到指定的主题(
/test/offline
)。 - 同步连接:调用
connectSync()
方法来同步连接到 MQTT 服务器。 - 订阅主题:订阅了
/test/123
主题,并定义了如何处理订阅成功事件和收到的消息。 - 定时发送消息:使用
Timer
定时向/test/client
主题发送消息。
- 初始化 MQTT 客户端:使用
这段代码通过 JFinal AOP 提供的注解方式配置了一个 MQTT 客户端。客户端在启动时会自动连接到 MQTT 服务器,订阅指定的主题,并定期向另一个主题发送消息。同时,它通过实现连接监听器来记录连接和断开事件。这样的设置在 IoT(物联网)应用中很常见,用于设备与 MQTT 服务器之间的通信。