一 spingboot整合mqtt

原理:

 二 操作案例

2.1 工程结构

 2.2 配置pom文件
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId>
<version>4.13</version> <scope>test</scope> </dependency> <!-- mqtt -->
<dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId> </dependency>
<dependency> <groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId> </dependency> <dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- lombok -->
<dependency> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <version>1.16.16</version> </dependency> <!--
springBoot的启动器 --> <dependency> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.1.RELEASE</version> </dependency> <dependency>
<groupId>org.springframework</groupId> <artifactId>spring-web</artifactId>
<version>5.1.5.RELEASE</version> </dependency>
 2.3 配置application配置文件
server: port: 8081 spring: mqtt: username: admin # 账号 password: public # 密码
host-url: tcp://172.16.71.150:1883 # mqtt连接tcp地址 client-id: mq-dky-0813 #
客户端Id,每个启动的id要不同 default-topic: mq-dky-guolu # 默认主题 timeout: 100 # 超时时间
keepalive: 100
2.4 读取配置文件,初始客户端
package com.ljf.mqtt.demo.config; import
com.ljf.mqtt.demo.client.MqttPushClient; import lombok.Getter; import
lombok.Setter; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import
org.springframework.stereotype.Component; /** * @ClassName: MqttConfig *
@Description: TODO * @Author: liujianfu * @Date: 2021/08/16 14:43:39  *
@Version: V1.0 **/ @Component @ConfigurationProperties("spring.mqtt") @Setter
@Getter public class MqttConfig { @Autowired private MqttPushClient
mqttPushClient; /** * 用户名 */ private String username; /** * 密码 */ private
String password; /** * 连接地址 */ private String hostUrl; /** * 客户Id */ private
String clientId; /** * 默认连接话题 */ private String defaultTopic; /** * 超时时间 */
private int timeout; /** * 保持连接数 */ private int keepalive; @Bean public
MqttPushClient getMqttPushClient() { mqttPushClient.connect(hostUrl, clientId,
username, password, timeout, keepalive); // 以/#结尾表示订阅所有以test开头的主题
mqttPushClient.subscribe(defaultTopic, 0); return mqttPushClient; } }
2.4 订阅推送客户端
package com.ljf.mqtt.demo.client; import
com.ljf.mqtt.demo.listener.PushCallback; import
org.eclipse.paho.client.mqttv3.*; import
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import
org.slf4j.Logger; import org.slf4j.LoggerFactory; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.stereotype.Component; /** * @ClassName: MqttPushClient *
@Description: TODO * @Author: liujianfu * @Date: 2021/08/16 14:48:38  *
@Version: V1.0 **/ @Component public class MqttPushClient { private static
final Logger logger = LoggerFactory.getLogger(MqttPushClient.class); @Autowired
private PushCallback pushCallback; private static MqttClient client; private
static MqttClient getClient() { return client; } private static void
setClient(MqttClient client) { MqttPushClient.client = client; } /** * 客户端连接 *
* @param host ip+端口 * @param clientID 客户端Id * @param username 用户名 * @param
password 密码 * @param timeout 超时时间 * @param keepalive 保留数 */ public void
connect(String host, String clientID, String username, String password, int
timeout, int keepalive) { MqttClient client; try { client = new
MqttClient(host, clientID, new MemoryPersistence()); MqttConnectOptions options
= new MqttConnectOptions(); options.setCleanSession(true);
options.setUserName(username); options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client); try { client.setCallback(pushCallback);
client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch
(Exception e) { e.printStackTrace(); } } /** * 发布 * * @param qos 连接方式 * @param
retained 是否保留 * @param topic 主题 * @param pushMessage 消息体 */ public void
publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage(); message.setQos(qos);
message.setRetained(retained); message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null ==
mTopic) { logger.error("topic not exist"); } MqttDeliveryToken token; try {
token = mTopic.publish(message); token.waitForCompletion(); } catch
(MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) {
e.printStackTrace(); } } /** * 订阅某个主题 * * @param topic 主题 * @param qos 连接方式 */
public void subscribe(String topic, int qos) {
logger.info("==============开始订阅主题=========" + topic); try {
MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) {
e.printStackTrace(); } } }
2.5 定制监听订阅者
package com.ljf.mqtt.demo.listener; import
com.ljf.mqtt.demo.config.MqttConfig; import
org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import
org.eclipse.paho.client.mqttv3.MqttCallback; import
org.eclipse.paho.client.mqttv3.MqttClient; import
org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import
org.slf4j.LoggerFactory; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.stereotype.Component; /** * @ClassName: PushCallback *
@Description: TODO * @Author: liujianfu * @Date: 2021/08/16 14:52:20  *
@Version: V1.0 **/ @Component public class PushCallback implements MqttCallback
{ private static final Logger logger =
LoggerFactory.getLogger(PushCallback.class); @Autowired private MqttConfig
mqttConfig; private static MqttClient client; @Override public void
connectionLost(Throwable throwable) { // 连接丢失后,一般在这里面进行重连
logger.info("连接断开,可以做重连"); if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient(); } } @Override public void messageArrived(String
topic, MqttMessage mqttMessage) throws Exception { // subscribe后得到的消息会执行到这里面
logger.info("接收消息主题 : " + topic); logger.info("接收消息Qos : " +
mqttMessage.getQos()); logger.info("接收消息内容 : " + new
String(mqttMessage.getPayload())); } @Override public void
deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } }
2.6 发布数据
package com.ljf.mqtt.demo.controller; import
com.ljf.mqtt.demo.client.MqttPushClient; import com.ljf.mqtt.demo.utils.R;
import org.springframework.beans.factory.annotation.Autowired; import
org.springframework.web.bind.annotation.GetMapping; import
org.springframework.web.bind.annotation.RequestMapping; import
org.springframework.web.bind.annotation.RestController; /** * @ClassName:
PullController * @Description: TODO * @Author: liujianfu * @Date:
2021/08/16 14:56:18  * @Version: V1.0 **/ @RestController @RequestMapping("/")
public class PullController { @Autowired private MqttPushClient mqttPushClient;
/** * @author liujianfu * @description 测试发布主题 * @date 2021/8/16 15:04 * @param
[] * @return RUtils */ @GetMapping(value = "/publishTopic") public R
publishTopic(String sendMessage) { System.out.println("message:"+sendMessage);
sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";
mqttPushClient.publish(0,false,"mq-dky-guolu",sendMessage); return R.ok("OK");
} }
2.7 发布数据

1.发布数据:

 2.订阅消费数据

 3.emqx页面

4.在页面进行模拟

连接

订阅

 推送:

java代码客户端:

技术
下载桌面版
GitHub
Microsoft Store
SourceForge
Gitee
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
京东云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信