13e77f2b98c22896ebc80c119652a3d0.png

思路

因为harmonyOS暂时没有发现现成的mqtt的js包,所以使用Java进行Mqtt消息的接收,使用JS去定时调用Java接收到消息并展示

首先是JS调用Java,JS FA(Feature Ability)调用Java PA(Particle Ability)有两种方式,Ability和Internal Ability,这里使用的是第一种Ability

然后是Java端的Mqtt消息接收,使用paho的第三方库进行消息接收,页面启动时JS端调用Java端实现Mqtt消息接收开始,使用异步挂起,接收消息并缓存,随后JS端每次调用Java端拿到的都是最新缓存的信息

具体代码

hml页面:

{{ title }}

开始mqtt

停止mqtt

JS代码:

const ABILITY_TYPE_EXTERNAL = 0;

const ACTION_SYNC = 0;

const ACTION_MESSAGE_CODE_START_MQTT = 1001;

const ACTION_MESSAGE_CODE_MQTT_MESSAGE = 1002;

const BUNDLE_NAME = 'com.example.mqttapplication';

const ABILITY_NAME = 'com.example.mqttapplication.PlayAbility';

export const playAbility = {

startMqtt: async function() {

FeatureAbility.callAbility({

messageCode: ACTION_MESSAGE_CODE_START_MQTT,

abilityType: ABILITY_TYPE_EXTERNAL,

syncOption: ACTION_SYNC,

bundleName: BUNDLE_NAME,

abilityName: ABILITY_NAME

});

},

mqttMessage: async function(that) {

var result = await FeatureAbility.callAbility({

messageCode: ACTION_MESSAGE_CODE_MQTT_MESSAGE,

abilityType: ABILITY_TYPE_EXTERNAL,

syncOption: ACTION_SYNC,

bundleName: BUNDLE_NAME,

abilityName: ABILITY_NAME

});

var ret = JSON.parse(result);

if (ret.code == 0) {

console.info('mqtt is:'+ JSON.stringify(ret.abilityResult));

that.title = 'mqtt is:'+ JSON.stringify(ret.abilityResult);

} else{

console.error('mqtt error code:'+ JSON.stringify(ret.code));

}

}

}

export default{

data: {

title: "",

timer: null

},

task() {

playAbility.mqttMessage(this);

},

mqttMessage() {

this.title = "开始获取MQTT消息";

this.task()

this.timer=setInterval(this.task,200)

},

stopMqtt() {

clearInterval(this.timer)

}

}

//初始化Java端Mqtt消息接收

playAbility.startMqtt()

Java端代码(接收Mqtt消息,异步)

import org.eclipse.paho.client.mqttv3.*;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.List;

publicclass MqttThread implements Runnable {

/**地址*/

publicstaticfinal String MQTT_BROKER_HOST ="tcp://xxx.xxx.xxx.xxx:1883";

/**客户端唯一标识*/

publicstaticfinal String MQTT_CLIENT_ID ="client";

/**订阅标识*/

publicstaticfinal String MQTT_TOPIC ="HarmonyTest";

/**客户端*/

private volatile staticMqttClient mqttClient;

/**连接选项*/

private staticMqttConnectOptions options;

/**消息*/

private final List message;

publicMqttThread(List message) {

this.message = message;

}

publicvoid run() {

try {

mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());

options = new MqttConnectOptions();

options.setCleanSession(true);

options.setConnectionTimeout(20);

options.setKeepAliveInterval(20);

mqttClient.connect(options);

mqttClient.subscribe(MQTT_TOPIC);

mqttClient.setCallback(new MqttCallback() {

@Override

publicvoid connectionLost(Throwable throwable) { }

@Override

publicvoid messageArrived(String s, MqttMessage mqttMessage) {

message.clear();

message.add(mqttMessage.toString());

System.out.println("接收到mqtt消息:"+ mqttMessage.toString());

}

@Override

publicvoid deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { }

});

} catch (Exception e) {

e.printStackTrace();

}

}

}

Java端代码(Particle Ability)

import com.example.mqttapplication.mqtt.MqttThread;

import ohos.aafwk.ability.Ability;

import ohos.aafwk.content.Intent;

import ohos.hiviewdfx.HiLog;

import ohos.hiviewdfx.HiLogLabel;

import ohos.rpc.*;

import ohos.utils.zson.ZSONObject;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

publicclass PlayAbility extends Ability {

staticfinal HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 1,"MY_TAG");

private staticfinalintERROR = -1;

private staticfinalintSUCCESS = 0;

private staticfinalintSTART_MQTT = 1001;

private staticfinalintMQTT_MESSAGE = 1002;

@Override

protected void onStart(Intent intent) {

super.onStart(intent);

}

@Override

protected IRemoteObject onConnect(Intent intent) {

super.onConnect(intent);

PlayRemote remote = new PlayRemote();

returnremote.asObject();

}

staticclass PlayRemote extends RemoteObject implements IRemoteBroker {

private List message;

private Thread thread;

publicPlayRemote() {

super("PlayRemote");

}

@Override

publicboolean onRemoteRequest(intcode, MessageParcel data, MessageParcel reply, MessageOptionoption) {

// 开始mqtt

elseif (code == START_MQTT) {

Map result = new HashMap<>();

result.put("code", SUCCESS);

result.put("abilityResult","成功开始mqtt");

try {

message = new ArrayList<>();

MqttThread mqttThread = new MqttThread(message);

thread = new Thread(mqttThread);

thread.start();

System.out.println("mqtt启动成功");

}

catch (Exception e) {

result.put("code", ERROR);

result.put("abilityResult","启动失败");

}

reply.writeString(ZSONObject.toZSONString(result));

}

// 获取mqtt消息

elseif (code == MQTT_MESSAGE) {

Map result = new HashMap<>();

result.put("code", SUCCESS);

if (message.isEmpty()) {

result.put("abilityResult","未接收到MQTT消息");

}

else{

ZSONObject zsonObject = ZSONObject.stringToZSON(message.get(0));

result.put("abilityResult", zsonObject.getString("message"));

}

reply.writeString(ZSONObject.toZSONString(result));

}

else{

Map result = new HashMap<>();

result.put("abilityError", ERROR);

reply.writeString(ZSONObject.toZSONString(result));

returnfalse;

}

returntrue;

}

@Override

publicIRemoteObject asObject() {

returnthis;

}

}

}

另外启动网络连接还需要往config.json里加点东西获取权限

{

...

"module": {

...

"reqPermissions": [

{

"name":"ohos.permission.GET_NETWORK_INFO"

},

{

"name":"ohos.permission.INTERNET"

},

{

"name":"ohos.permission.SET_NETWORK_INFO"

},

{

"name":"ohos.permission.MANAGE_WIFI_CONNECTION"

},

{

"name":"ohos.permission.SET_WIFI_INFO"

},

{

"name":"ohos.permission.GET_WIFI_INFO"

}

]

}

}

最后写了个python的脚本用来发送mqtt消息,很简单就一行

import paho.mqtt.publishaspublish

publish.single('HarmonyTest','{"message":"BongShakalaka"}', hostname='xxx.xxx.xxx.xxx')

附:mqtt消息是要有mqtt服务器的,这个就自己搭或者买吧

【编辑推荐】

【责任编辑:jianghua TEL:(010)68476606】

点赞 0

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐