MQTT+ActiveMQ实现消息推送(服务器端java实现)
上一篇文章已经介绍了mqtt+activemq实现消息推送移动端的实现,也介绍了利用自带的web console进行消息发布的方法。但是在具体的项目应用中,当我们将需要将该消息推送模块嵌入到一个后台管理系统当中,我们就需要在web端来访问activeMQ来进行消息的发布。按照惯例,先上项目的地址 https://github.com/nymar123/Publisher同样用的是org.eclip
文章共918字 · 阅读需要大约4分钟
一键AI生成摘要,助你高效阅读
问答
·
上一篇文章已经介绍了mqtt+activemq实现消息推送移动端的实现,也介绍了利用自带的web console进行消息发布的方法。但是在具体的项目应用中,当我们将需要将该消息推送模块嵌入到一个后台管理系统当中,我们就需要在web端来访问activeMQ来进行消息的发布。
按照惯例,先上项目的地址 https://github.com/nymar123/Publisher
同样用的是org.eclipse.paho.client.mqttv3包,相信看过上一篇文章已经有了一定的了解,直接来看代码
@Controller
public class HelloController{
private static final String HOST = "tcp://127.0.0.1:1883";
private String TOPIC;
private String MESSAGE;
private static final String clientid = "server";
private MqttClient client;
private MqttTopic topic;
private String userName = "admin";
private String passWord = "password";
private MqttMessage message;
@RequestMapping(value="/redirect")
public String doPublish(HttpServletRequest request) throws MqttException {
//获取前台传过来的两个参数
TOPIC=request.getParameter("topic");
MESSAGE=request.getParameter("message");
//new mqttClient
//MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
//与activeMQ连接的方法
connect();
//new mqttMessage
message = new MqttMessage();
//设置服务质量
message.setQos(2);
//设置是否在服务器中保存消息体
message.setRetained(true);
//设置消息的内容
message.setPayload(MESSAGE.getBytes());
//发布
publish(topic, message);
System.out.println("已发送");
return "result";
}
private void connect() {
// new mqttConnection 用来设置一些连接的属性
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
// 换而言之,设置为false时可以客户端可以接受离线消息
options.setCleanSession(false);
// 设置连接的用户名和密码
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
// 设置回调类
client.setCallback(new PushCallback());
// 连接
client.connect(options);
// 获取activeMQ上名为TOPIC的topic
topic = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {
// 发布的方法
// new mqttDeliveryToken
MqttDeliveryToken token = topic.publish(message);
// 发布
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
}
简陋的效果图
更多推荐
已为社区贡献1条内容
所有评论(0)