Spring Boot2 创建

Reads: 471 Edit

创建springBoot应用

访问 http://start.spring.io/

引入web模块

pom.xml中添加支持web模块

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

热部署

Springboot2.*的热部署配置

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
</dependency>

单元测试

1,在pom包中添加spring-boot-starter-test包引用

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

2,开发测试类 在测试类的类头部需要添加:@RunWith(SpringRunner.class)和@SpringBootTest注解,在测试方法的顶端添加@Test即可,最后在方法上点击右键run就可以运行。

@RunWith(SpringRunner.class)
@SpringBootTest
public class AppTest {
	Logger logger = LoggerFactory.getLogger(AppTest.class);
	
	@Autowired
	private IHelloService helloService;
	
	@Test
	public void test1(){
		logger.info("--- test helloService=" + helloService);
		 helloService.sayHello();
	}
	
}

整合springboot的代码

所需要的依赖

<!--mqtt-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.66</version>
		</dependency>

消息publicsh端代码实现

mqtt发送消息的核心类,创建连接使用了单例的方式,

package com.mqtt.demo2;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;


import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class MqttPushServer {
	private MqttClient client;
	 
    private static volatile MqttPushServer mqttPushClient = null;
    //重连次数
    private int reConnTimes;
 
    public int getReConnTimes() {
        return this.reConnTimes;
    }
 
    public void setReConnTimes(int reConnTimes) {
        if (this.isConnected()) {
            reConnTimes = 0;
        }
 
        this.reConnTimes = reConnTimes;
    }
 
    public int getMaxReconnTimes() {
        return PropertiesUtil.MQTT_MAXRECONNECTTIMES;
    }
 
    public int getReconnInterval() {
        return PropertiesUtil.MQTT_RECONNINTERVAL;
    }
 
    public static MqttPushServer getInstance(){
 
        if(null == mqttPushClient){
            synchronized (MqttPushServer.class){
                if(null == mqttPushClient){
                    mqttPushClient = new MqttPushServer();
                }
            }
 
        }
        return mqttPushClient;
 
    }
 
    private MqttPushServer() {
        connect();
    }
 
    public void connect(){
        try {
            client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            /**
             * clean session 值为false,既保留会话,那么该客户端上线的时候,并订阅了主题“r”,那么该主题会一直存在,即使客户端离线,该主题也仍然会记忆在EMQ服务器内存。
             * 当客户端离线又上线时,仍然会接受到离线期间别人发来的publish消息(QOS=0,1,2).类似及时通讯软件,终端可以接受离线消息。
             * 除非客户端主动取消订阅主题, 否则主题一直存在。另外,mnesia不会持久化session,subscription和topic,服务器重启则丢失。
             * 当clean session 为true
             * 该客户端上线,并订阅了主题“r”,那么该主题会随着客户端离线而删除。
             * 当客户端离线又上线时,接受不到离线期间别人发来的publish消息
             *
             * 不管clean session的值是什么,当终端设备离线时,QoS=0,1,2的消息一律接收不到。
             * 当clean session的值为true,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息一律接收不到。
             * 当clean session的值为false,当终端设备离线再上线时,离线期间发来QoS=0,1,2的消息仍然可以接收到。如果同个主题发了多条就接收多条,一条不差,照单全收
             *
             */
            options.setCleanSession(false);
            /*options.setUserName(PropertiesUtil.MQTT_USER_NAME);
            options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());*/
            options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
            options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
            try {
                client.setCallback(new PushCallback2());
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
 
    /**
     * 发布,默认qos为0,非持久化
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic,PushPayload pushMessage){
        publish(0, false, topic, pushMessage);
    }
 
    /**
     * 发布
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.toString().getBytes());
        MqttTopic mTopic = client.getTopic(topic);
        if(null == mTopic){
            System.err.println("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
        	try {
				client.disconnect();
				client.close();
			} catch (MqttException e) {
				e.printStackTrace();
			}
        	
        }
    }
 
    /**
     * 订阅某个主题,qos默认为0
     * @param topic
     */
    public void subscribe(String topic){
        subscribe(topic,0);
    }
 
    /**
     * 订阅某个主题
     * @param topic
     * @param qos
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    public boolean isConnected() {
        return client.isConnected();
    }
 
    public static void main(String[] args) throws Exception {
        String kdTopic = "demo/topics";
        PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215")
                .setContent("designModel")
                .bulid();
        MqttPushServer.getInstance().publish(0, false, kdTopic, pushMessage);
 
    }

}

配置类

用于读取mqtt的一些配置,服务器采用本的服务器不需要配置密码,这里我把账号密码都进行了注释

package com.mqtt.demo2;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertiesUtil {
	public static String MQTT_HOST;
	public static String MQTT_CLIENTID;
	public static String MQTT_USER_NAME;
	public static String MQTT_PASSWORD;
	public static int MQTT_TIMEOUT;
	public static int MQTT_KEEP_ALIVE;
	public static String prefixUrl;
	/**
	 * 最大重连次数
	 */
	public static int MQTT_MAXRECONNECTTIMES;
	public static int MQTT_RECONNINTERVAL;

	static {
		MQTT_HOST = loadMqttProperties().getProperty("host");
		MQTT_CLIENTID = loadMqttProperties().getProperty("clientid");
		/*
		 * MQTT_USER_NAME = loadMqttProperties().getProperty("username"); 
		 * MQTT_PASSWORD = loadMqttProperties().getProperty("password");
		 */
		MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("timeout"));
		MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("keepalive"));
		MQTT_MAXRECONNECTTIMES = Integer.valueOf(loadMqttProperties().getProperty("maxReconnectTimes"));
		MQTT_RECONNINTERVAL = Integer.valueOf(loadMqttProperties().getProperty("reconnInterval"));
		prefixUrl = String.valueOf(loadMqttProperties().getProperty("prefixUrl"));
	}

	private static Properties loadMqttProperties() {
		InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.properties");
		Properties properties = new Properties();
		try {
			properties.load(inputstream);
			return properties;
		} catch (IOException e) {
			throw new RuntimeException(e);
		} finally {
			try {
				if (inputstream != null) {
					inputstream.close();
				}
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
		}
	}

	public static void main(String[] args) {
		System.out.println(PropertiesUtil.MQTT_CLIENTID);
	}

}

mqtt.properties

host=tcp://192.168.11.10:1883
clientid=JavaSample
topic=demo/topics
timeout=10
keepalive=20
maxReconnectTimes=5
reconnInterval=1
prefixUrl=http://192.168.3.93:18083

mqttqos支持的三种类型的枚举类

package com.mqtt.demo2;

public enum QosType {
	QOS_AT_MOST_ONCE(0, "最多一次,有可能重复或丢失"),
	QOS_AT_LEAST_ONCE(1, "至少一次,有可能重复"), 
	QOS_EXACTLY_ONCE(2, "只有一次,确保消息只到达一次");
	
	private int number;
	private String desc;

	QosType(int num, String desc) {
		this.number = num;
		this.desc = desc;
	}

	public int getNumber() {
		return number;
	}

	public String getDesc() {
		return desc;
	}

}

推送消息的实体类

package com.mqtt.demo2;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Setter
@Getter
public class PushPayload {
	  //推送类型
    private String type;
    //推送对象
    private String mobile;
    //标题
    private String title;
    //内容
    private String content;
    //数量
    private Integer badge = 1;
    //铃声
    private String sound = "default";
    public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
        this.type = type;
        this.mobile = mobile;
        this.title = title;
        this.content = content;
        this.badge = badge;
        this.sound = sound;
    }
 
    public static class Builder{
        //推送类型
        private String type;
        //推送对象
        private String mobile;
        //标题
        private String title;
        //内容
        private String content;
        //数量
        private Integer badge = 1;
        //铃声
        private String sound = "default";
 
        public Builder setType(String type) {
            this.type = type;
            return this;
        }
 
        public Builder setMobile(String mobile) {
            this.mobile = mobile;
            return this;
        }
 
        public Builder setTitle(String title) {
            this.title = title;
            return this;
        }
 
        public Builder setContent(String content) {
            this.content = content;
            return this;
        }
 
        public Builder setBadge(Integer badge) {
            this.badge = badge;
            return this;
        }
 
        public Builder setSound(String sound) {
            this.sound = sound;
            return this;
        }
 
        public PushPayload bulid(){
            return new PushPayload(type,mobile,title,content,badge,sound);
        }
    }
 
 
    public static Builder getPushPayloadBuider(){
        return new Builder();
    }
 
 
    @Override
    public String toString() {
        return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
    }


	public String getType() {
		return type;
	}


	public void setType(String type) {
		this.type = type;
	}


	public String getMobile() {
		return mobile;
	}


	public void setMobile(String mobile) {
		this.mobile = mobile;
	}


	public String getTitle() {
		return title;
	}


	public void setTitle(String title) {
		this.title = title;
	}


	public String getContent() {
		return content;
	}


	public void setContent(String content) {
		this.content = content;
	}


	public Integer getBadge() {
		return badge;
	}


	public void setBadge(Integer badge) {
		this.badge = badge;
	}


	public String getSound() {
		return sound;
	}


	public void setSound(String sound) {
		this.sound = sound;
	}
    
    
    
}

PushCallback

package com.mqtt.demo2;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.concurrent.TimeUnit;

public class PushCallback2 implements MqttCallback {
	@Override
	public void connectionLost(Throwable throwable) {
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		// 服务端不用关心,客户端的业务
		// subscribe后得到的消息会执行到这里面
		System.out.println("接收消息主题 : " + topic);
		System.out.println("接收消息Qos : " + message.getQos());
		System.out.println("接收消息内容 : " + new String(message.getPayload()));

	}
}

测试controller

使用main方法测试,这里我将mqtt交给了spring进行维护管理

package com.mqtt.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EMQController {
	// http://127.0.0.1:8888/sendMessage
	@GetMapping("/sendMessage")
	public String testMQ() {
		String kdTopic = "demo/topics";
		PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215").setContent("designModel")
				.bulid();
		/**
		 * mqtt发布消息的时候,可以设置保留消息标志,保留消息会驻留在消息服务器,后来的订阅主题仍然可以接受该消息 关于retain的说明:
		 * 终端设备publish消息时,如果retain值是true,则会服务器一直记忆,哪怕是服务重启。因为Mnesia会本地持久化。
		 * publish某主题的消息,payload为空且retain值是true,则会删除这条持久化的消息。
		 *
		 * publish某主题的消息,payload为空且retain值是false,则不会删除这条持久化的消息。 QOS_AT_MOST_ONCE(0,
		 * "最多一次,有可能重复或丢失"), QOS_AT_LEAST_ONCE(1, "至少一次,有可能重复"), QOS_EXACTLY_ONCE(2,
		 * "只有一次,确保消息只到达一次");
		 */
		// 这里将消息异步处理 使用futuretask,或者使用rabbimq进行异步处理或者spring的异步机制进行处理
		FutureTask futureTask = new FutureTask(() -> {
			MqttPushServer.getInstance().publish(QosType.QOS_AT_LEAST_ONCE.getNumber(), true, kdTopic, pushMessage);
			return true;
		});
		ExecutorService service = Executors.newCachedThreadPool();
		service.submit(futureTask);
		try {
			Boolean result = (Boolean) futureTask.get();
			if (result == true) {
				System.out.println("消息发送成功");
			} else {
				System.out.println("消息推送异常");
			}
		} catch (Exception e) {
			System.out.println("消息推送异常");
			e.printStackTrace();
		}
		return "ok";
	}
}

客户端

package com.mqtt.demo2;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Client {

	public static void main(String[] args) {
		String topic = "demo/topics";
		String content = "Message from MqttPublishSample";
		int qos = 1;
		String broker = "tcp://192.168.11.10:1883";
		String clientId = "client1";
		MemoryPersistence persistence = new MemoryPersistence();

		try {
			MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
			MqttConnectOptions connOpts = new MqttConnectOptions();
			connOpts.setCleanSession(true);
			System.out.println("Connecting to broker: " + broker);
			sampleClient.connect(connOpts);
			System.out.println("Connected");
			System.out.println("Publishing message: " + content);
			MqttMessage message = new MqttMessage(content.getBytes());
			message.setQos(qos);
			sampleClient.subscribe("demo/topics", 1);
			sampleClient.setCallback(new MqttCallback() {
				@Override
				public void connectionLost(Throwable throwable) {

				}

				@Override
				public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
					System.out.println("mqttMessage=" + mqttMessage);
					System.out.println("topic=" + topic);
					System.out.println();
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

				}
			});

//			System.out.println("Message published");
//			sampleClient.disconnect();
//			System.out.println("Disconnected");
			// System.exit(0);

		} catch (MqttException me) {
			System.out.println("reason " + me.getReasonCode());
			System.out.println("msg " + me.getMessage());
			System.out.println("loc " + me.getLocalizedMessage());
			System.out.println("cause " + me.getCause());
			System.out.println("excep " + me);
			me.printStackTrace();
		}

	}

}

测试

package com.mqtt.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class Test {

	public static void main(String[] args) {
		String kdTopic = "demo/topics";
		PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("17637900215").setContent("designModel")
				.bulid();
		System.out.println(pushMessage);

        //这里将消息异步处理  使用futuretask,或者使用rabbimq进行异步处理或者spring的异步机制进行处理
        FutureTask futureTask = new FutureTask(() -> {
            MqttPushServer.getInstance().publish(QosType.QOS_AT_LEAST_ONCE.getNumber(), true, kdTopic, pushMessage);
            return true;
        });
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(futureTask);
        try {
            Boolean result = (Boolean) futureTask.get();
            if (result == true) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息推送异常");
            }
        } catch (Exception e) {
            System.out.println("消息推送异常");
            e.printStackTrace();
        }
	}

}

关于作者

王硕,网名信平,十多年软件开发经验,业余架构师,精通Java/Python/Go等,喜欢研究技术,著有《PyQt 5 快速开发与实战》《Python 3.* 全栈开发》,多个业余开源项目托管在GitHub上,欢迎微博交流。


Comments

Make a comment

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。