创建springBoot应用
访问 http://start.spring.io/
引入web模块
pom.xml中添加支持web模块
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
热部署
Springboot2.*的热部署配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
单元测试
1,在pom包中添加spring-boot-starter-test包引用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
2,开发测试类 在测试类的类头部需要添加:@RunWith(SpringRunner.class)和@SpringBootTest注解,在测试方法的顶端添加@Test即可,最后在方法上点击右键run就可以运行。
@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTest {
Logger logger = LoggerFactory.getLogger(AppTest.class);
@Autowired
private IHelloService helloService;
@Test
public void test1(){
logger.info("--- test helloService=" + helloService);
helloService.sayHello();
}
}
整合springboot的代码
所需要的依赖
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
消息publicsh端代码实现
mqtt发送消息的核心类,创建连接使用了单例的方式,
package com.mqtt.demo2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class MqttPushServer {
private MqttClient client;
private static volatile MqttPushServer mqttPushClient = null;
//重连次数
private int reConnTimes;
public int getReConnTimes() {
return this.reConnTimes;
}
public void setReConnTimes(int reConnTimes) {
if (this.isConnected()) {
reConnTimes = 0;
}
this.reConnTimes = reConnTimes;
}
public int getMaxReconnTimes() {
return PropertiesUtil.MQTT_MAXRECONNECTTIMES;
}
public int getReconnInterval() {
return PropertiesUtil.MQTT_RECONNINTERVAL;
}
public static MqttPushServer getInstance(){
if(null == mqttPushClient){
synchronized (MqttPushServer.class){
if(null == mqttPushClient){
mqttPushClient = new MqttPushServer();
}
}
}
return mqttPushClient;
}
private MqttPushServer() {
connect();
}
public void connect(){
try {
client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
/**
* clean session 值为false,既保留会话,那么该客户端上线的时候,并订阅了主题“r”,那么该主题会一直存在,即使客户端离线,该主题也仍然会记忆在EMQ服务器内存。
* 当客户端离线又上线时,仍然会接受到离线期间别人发来的publish消息(QOS=0,1,2).类似及时通讯软件,终端可以接受离线消息。
* 除非客户端主动取消订阅主题, 否则主题一直存在。另外,mnesia不会持久化session,subscription和topic,服务器重启则丢失。
* 当clean session 为true
* 该客户端上线,并订阅了主题“r”,那么该主题会随着客户端离线而删除。
* 当客户端离线又上线时,接受不到离线期间别人发来的publish消息
*
* 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。
* 当clean session的值为true,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。
* 当clean session的值为false,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收
*
*/
options.setCleanSession(false);
/*options.setUserName(PropertiesUtil.MQTT_USER_NAME);
options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());*/
options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
try {
client.setCallback(new PushCallback2());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void publish(String topic,PushPayload pushMessage){
publish(0, false, topic, pushMessage);
}
/**
* 发布
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.toString().getBytes());
MqttTopic mTopic = client.getTopic(topic);
if(null == mTopic){
System.err.println("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
finally {
try {
client.disconnect();
client.close();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
subscribe(topic,0);
}
/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public boolean isConnected() {
return client.isConnected();
}
public static void main(String[] args) throws Exception {
String kdTopic = "demo/topics";
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215")
.setContent("designModel")
.bulid();
MqttPushServer.getInstance().publish(0, false, kdTopic, pushMessage);
}
}
配置类
用于读取mqtt的一些配置,服务器采用本的服务器不需要配置密码,这里我把账号密码都进行了注释
package com.mqtt.demo2;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertiesUtil {
public static String MQTT_HOST;
public static String MQTT_CLIENTID;
public static String MQTT_USER_NAME;
public static String MQTT_PASSWORD;
public static int MQTT_TIMEOUT;
public static int MQTT_KEEP_ALIVE;
public static String prefixUrl;
/**
* 最大重连次数
*/
public static int MQTT_MAXRECONNECTTIMES;
public static int MQTT_RECONNINTERVAL;
static {
MQTT_HOST = loadMqttProperties().getProperty("host");
MQTT_CLIENTID = loadMqttProperties().getProperty("clientid");
/*
* MQTT_USER_NAME = loadMqttProperties().getProperty("username");
* MQTT_PASSWORD = loadMqttProperties().getProperty("password");
*/
MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("timeout"));
MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("keepalive"));
MQTT_MAXRECONNECTTIMES = Integer.valueOf(loadMqttProperties().getProperty("maxReconnectTimes"));
MQTT_RECONNINTERVAL = Integer.valueOf(loadMqttProperties().getProperty("reconnInterval"));
prefixUrl = String.valueOf(loadMqttProperties().getProperty("prefixUrl"));
}
private static Properties loadMqttProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.properties");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
System.out.println(PropertiesUtil.MQTT_CLIENTID);
}
}
mqtt.properties
host=tcp://192.168.11.10:1883
clientid=JavaSample
topic=demo/topics
timeout=10
keepalive=20
maxReconnectTimes=5
reconnInterval=1
prefixUrl=http://192.168.3.93:18083
mqttqos支持的三种类型的枚举类
package com.mqtt.demo2;
public enum QosType {
QOS_AT_MOST_ONCE(0, "最多一次,有可能重复或丢失"),
QOS_AT_LEAST_ONCE(1, "至少一次,有可能重复"),
QOS_EXACTLY_ONCE(2, "只有一次,确保消息只到达一次");
private int number;
private String desc;
QosType(int num, String desc) {
this.number = num;
this.desc = desc;
}
public int getNumber() {
return number;
}
public String getDesc() {
return desc;
}
}
推送消息的实体类
package com.mqtt.demo2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Setter
@Getter
public class PushPayload {
//推送类型
private String type;
//推送对象
private String mobile;
//标题
private String title;
//内容
private String content;
//数量
private Integer badge = 1;
//铃声
private String sound = "default";
public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
this.type = type;
this.mobile = mobile;
this.title = title;
this.content = content;
this.badge = badge;
this.sound = sound;
}
public static class Builder{
//推送类型
private String type;
//推送对象
private String mobile;
//标题
private String title;
//内容
private String content;
//数量
private Integer badge = 1;
//铃声
private String sound = "default";
public Builder setType(String type) {
this.type = type;
return this;
}
public Builder setMobile(String mobile) {
this.mobile = mobile;
return this;
}
public Builder setTitle(String title) {
this.title = title;
return this;
}
public Builder setContent(String content) {
this.content = content;
return this;
}
public Builder setBadge(Integer badge) {
this.badge = badge;
return this;
}
public Builder setSound(String sound) {
this.sound = sound;
return this;
}
public PushPayload bulid(){
return new PushPayload(type,mobile,title,content,badge,sound);
}
}
public static Builder getPushPayloadBuider(){
return new Builder();
}
@Override
public String toString() {
return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Integer getBadge() {
return badge;
}
public void setBadge(Integer badge) {
this.badge = badge;
}
public String getSound() {
return sound;
}
public void setSound(String sound) {
this.sound = sound;
}
}
PushCallback
package com.mqtt.demo2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.concurrent.TimeUnit;
public class PushCallback2 implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 服务端不用关心,客户端的业务
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}
测试controller
使用main方法测试,这里我将mqtt交给了spring进行维护管理
package com.mqtt.demo2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EMQController {
// http://127.0.0.1:8888/sendMessage
@GetMapping("/sendMessage")
public String testMQ() {
String kdTopic = "demo/topics";
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215").setContent("designModel")
.bulid();
/**
* mqtt发布消息的时候,可以设置保留消息标志,保留消息会驻留在消息服务器,后来的订阅主题仍然可以接受该消息 关于retain的说明:
* 终端设备publish消息时,如果retain值是true,则会服务器一直记忆,哪怕是服务重启。因为Mnesia会本地持久化。
* publish某主题的消息,payload为空且retain值是true,则会删除这条持久化的消息。
*
* publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。 QOS_AT_MOST_ONCE(0,
* "最多一次,有可能重复或丢失"), QOS_AT_LEAST_ONCE(1, "至少一次,有可能重复"), QOS_EXACTLY_ONCE(2,
* "只有一次,确保消息只到达一次");
*/
// 这里将消息异步处理 使用futuretask,或者使用rabbimq进行异步处理或者spring的异步机制进行处理
FutureTask futureTask = new FutureTask(() -> {
MqttPushServer.getInstance().publish(QosType.QOS_AT_LEAST_ONCE.getNumber(), true, kdTopic, pushMessage);
return true;
});
ExecutorService service = Executors.newCachedThreadPool();
service.submit(futureTask);
try {
Boolean result = (Boolean) futureTask.get();
if (result == true) {
System.out.println("消息发送成功");
} else {
System.out.println("消息推送异常");
}
} catch (Exception e) {
System.out.println("消息推送异常");
e.printStackTrace();
}
return "ok";
}
}
客户端
package com.mqtt.demo2;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Client {
public static void main(String[] args) {
String topic = "demo/topics";
String content = "Message from MqttPublishSample";
int qos = 1;
String broker = "tcp://192.168.11.10:1883";
String clientId = "client1";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.subscribe("demo/topics", 1);
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("mqttMessage=" + mqttMessage);
System.out.println("topic=" + topic);
System.out.println();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
// System.out.println("Message published");
// sampleClient.disconnect();
// System.out.println("Disconnected");
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
测试
package com.mqtt.demo2;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class Test {
public static void main(String[] args) {
String kdTopic = "demo/topics";
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215").setContent("designModel")
.bulid();
System.out.println(pushMessage);
//这里将消息异步处理 使用futuretask,或者使用rabbimq进行异步处理或者spring的异步机制进行处理
FutureTask futureTask = new FutureTask(() -> {
MqttPushServer.getInstance().publish(QosType.QOS_AT_LEAST_ONCE.getNumber(), true, kdTopic, pushMessage);
return true;
});
ExecutorService service = Executors.newCachedThreadPool();
service.submit(futureTask);
try {
Boolean result = (Boolean) futureTask.get();
if (result == true) {
System.out.println("消息发送成功");
} else {
System.out.println("消息推送异常");
}
} catch (Exception e) {
System.out.println("消息推送异常");
e.printStackTrace();
}
}
}
关于作者
王硕,网名信平,十多年软件开发经验,业余架构师,精通Java/Python/Go等,喜欢研究技术,著有《PyQt 5 快速开发与实战》《Python 3.* 全栈开发》,多个业余开源项目托管在GitHub上,欢迎微博交流。