[物联网]Java如何对接MQTT物联网协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、
·
什么是MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
Java如何对接
- 创建对应的用户并提供权限
String res = HttpUtils.doPostJson(URL + "/api/v4/auth_username", JacksonUtils.objectToJson(params), headers);
String res1 = HttpUtils.doPostJson(URL+"/api/v4/acl", JacksonUtils.objectToJson(params1), headers);
- MQTT配置
@Bean
public MqttPahoClientFactory clientFactory() {
final MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{hostUrls});
options.setUserName(username);
options.setPassword(password.toCharArray());
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
}
@Bean(name = OUTBOUND_CHANNEL)
public MessageChannel messageChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler mqttOutbound() {
// 初始化出站通道适配器,使用的是Eclipse Paho MQTT客户端库
final MqttPahoMessageHandler handler = new MqttPahoMessageHandler("outbound_"+clientId, clientFactory());
// 设置默认的服务质量 0:只发一次消息,对方可能收不到 1:保证对方收到消息,有可能重复收到消息 2:保证对方收到消息且只收一次,耗资源
handler.setDefaultQos(0);
// 设置保留消息
handler.setDefaultRetained(true);
// 设置异步发送,默认是false(发送时阻塞)
handler.setAsync(true);
return handler;
}
/**
* MQTT消息接收处理
*/
@Bean(name = INPUT_CHANNEL)
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置client,监听的topic
*/
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("inbound_" + clientId, clientFactory(), TOPIC);
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 通过通道获取数据
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
log.info("topic: {}", topic);
log.info("payload: {}", message.getPayload().toString());
mqttHandleService.handle(topic,message.getPayload().toString());
};
}
- 配置完成后根据Topc进行解析数据
public void handle(String topic, String toString) {
switch (topic) {
case "topic1":
break;
default:
break;
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)