loraserver 源码解析 (六) lora-app-server
目录 下载源码升级 npm安装一些必要的依赖库pq_trgm extensionrun 调用 handleDataDownPayloads 开启一个Goroutine G1run再调用 startApplicationServerAPI 开启一个Goroutine G2run 又调用 startGatewayPing 开启一个Goroutine G3接下来的s...
目录
run 调用 handleDataDownPayloads 开启一个Goroutine G1
run再调用 startApplicationServerAPI 开启一个Goroutine G2
run 又调用 startGatewayPing 开启一个Goroutine G3
接下来的startJoinServerAPI()开启Goroutine G4
下载源码
go get -u github.com/brocaar/lora-app-server
升级 npm
npm是nodejs 的包管理工具, lora-app-server有些网页是js写的,所以需要
下面这行命令 在我电脑的emacs eshell 下运行不行,那就在 terminal 下执行吧
sudo npm i -g npm
安装一些必要的依赖库
npm install -g create-react-app
切换到 ui 目录下后
sudo npm install react-scripts
lora-app-server的依赖库,参考之前的几篇文章,这里就不细讲了。
反正编译的时候缺啥,就 go get 下啥
我make build 后,提示缺了下面这些
api/common.pb.go:9:8: cannot find package "github.com/brocaar/loraserver/api/common" in any of:
/home/wjs/go/src/github.com/brocaar/loraserver/api/common (from $GOROOT)
/home/wjs/go/gopath/src/github.com/brocaar/loraserver/api/common (from $GOPATH)
internal/storage/user.go:14:2: cannot find package "github.com/dgrijalva/jwt-go" in any of:
/home/wjs/go/src/github.com/dgrijalva/jwt-go (from $GOROOT)
/home/wjs/go/gopath/src/github.com/dgrijalva/jwt-go (from $GOPATH)
cmd/lora-app-server/cmd/root_run.go:18:2: cannot find package "github.com/gorilla/mux" in any of:
/home/wjs/go/src/github.com/gorilla/mux (from $GOROOT)
/home/wjs/go/gopath/src/github.com/gorilla/mux (from $GOPATH)
internal/handler/influxdbhandler/influxdb_handler.go:16:2: cannot find package "github.com/mmcloughlin/geohash" in any of:
/home/wjs/go/src/github.com/mmcloughlin/geohash (from $GOROOT)
/home/wjs/go/gopath/src/github.com/mmcloughlin/geohash (from $GOPATH)
internal/codec/custom_js.go:11:2: cannot find package "github.com/robertkrimen/otto" in any of:
/home/wjs/go/src/github.com/robertkrimen/otto (from $GOROOT)
/home/wjs/go/gopath/src/github.com/robertkrimen/otto (from $GOPATH)
cmd/lora-app-server/cmd/root_run.go:27:2: cannot find package "github.com/tmc/grpc-websocket-proxy/wsproxy" in any of:
/home/wjs/go/src/github.com/tmc/grpc-websocket-proxy/wsproxy (from $GOROOT)
/home/wjs/go/gopath/src/github.com/tmc/grpc-websocket-proxy/wsproxy (from $GOPATH)
依次go get -u -v 即可
搞完后make build
build目录下生成了 lora-app-server
生成 配置文件
lora-app-server configfile > lora-app-server.toml
打开 lora-app-server.toml
删除第一行
创建合适的数据库
可以参考 ubuntu 16.04 安装 postgresql 10 并 配置成loraserver需要的
这次要生成的是 loraserver_as 上次是 loraserver_ns
as: app-server ns network-server 也就是LoRa Wan 的as ns
wjs@wjs:~$ su
Password:
root@wjs:/home/wjs# su postgres
postgres@wjs:/home/wjs$ psql
psql (10.4 (Ubuntu 10.4-2.pgdg16.04+1))
Type "help" for help.
postgres=# create database loraserver_as owner dbuser;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON DATABASE loraserver_as TO dbuser;
GRANT
打开 lora-app-server.toml 修改下 数据库的dsn 如下
dsn="postgres://dbuser:123456@localhost/loraserver_as?sslmode=disable"
pq_trgm extension
You also need to enable the pg_trgm
(trigram) extension. Example to enable this extension (assuming your LoRa App Server database is named loraserver_as
):
Start the PostgreSQL prompt as the postgres
user:
sudo -u postgres psql
Within the PostgreSQL prompt, enter the following queries:
-- change to the LoRa App Server database
\c loraserver_as
-- enable the extension
create extension pg_trgm;
-- exit the prompt
\q
我们执行下面命令,利用openssl生成server.crt(证书)和server.key(私钥)
openssl genrsa -out server.key 2048
openssl req -new -x509 -key server.key -out server.crt -days 365
Country Name (2 letter code) [AU]:ch
State or Province Name (full name) [Some-State]:gz
Locality Name (eg, city) []:gz
Organization Name (eg, company) [Internet Widgits Pty Ltd]:zlg
Organizational Unit Name (eg, section) []:zlg
Common Name (e.g. server FQDN or YOUR name) []:wjs
Email Address []:xxxx@xxx.cn
用ls命令可以看到生成的文件
~/go/gopath/src/github.com/brocaar/lora-app-server/build $ ls
lora-app-server lora-app-server.toml server.crt server.key
然后把 server.crt 和 server.key 配置到 lora-app-server.toml 文件中。
# Settings for the "external api"
#
# This is the API and web-interface exposed to the end-user.
[application_server.external_api]
# ip:port to bind the (user facing) http server to (web-interface and REST / gRPC api)
bind="0.0.0.0:8080"
# http server TLS certificate
tls_cert="server.crt"
# http server TLS key
tls_key="server.key"
# JWT secret used for api authentication / authorization
# You could generate this by executing 'openssl rand -base64 32' for example
jwt_secret="123456"
# when set, existing users can't be re-assigned (to avoid exposure of all users to an organization admin)"
disable_assign_existing_users=false
运行程序 lora-app-server 大功告成 :)
~/go/gopath/src/github.com/brocaar/lora-app-server/build $ lora-app-server
INFO[0000] starting LoRa App Server docs="https://www.loraserver.io/" version=2.0.0-alpha.1-3-gb9999d3
INFO[0000] connecting to postgresql
INFO[0000] setup redis connection pool
INFO[0000] handler/mqtt: TLS config is empty
INFO[0000] handler/mqtt: connecting to mqtt broker server="tcp://localhost:1883"
INFO[0000] applying database migrations
INFO[0000] handler/mqtt: connected to mqtt broker
INFO[0000] handler/mqtt: subscribing to tx topic qos=0 topic=application/+/device/+/tx
INFO[0000] migrations applied count=0
INFO[0000] starting application-server api bind="0.0.0.0:8001" ca-cert= tls-cert= tls-key=
INFO[0000] starting join-server api bind="0.0.0.0:8003" ca_cert= tls_cert= tls_key=
INFO[0000] starting client api server bind="0.0.0.0:8080" tls-cert=server.crt tls-key=server.key
INFO[0000] registering rest api handler and documentation endpoint path=/api
与loraserver 和 lora-gateway-bridge 一样,lora-app-server也是由cobra完成命令行及配置处理的。
因此lora-app-server的核心在 root_run.go的run函数。
run 调用 handleDataDownPayloads 开启一个Goroutine G1
G1 不断侦听来自于 config.C.ApplicationServer.Integration.Handler 的 DataDownChan() 管道消息,一有消息就开启一个新的routine G1G 来处理,以保证G1不阻塞。 在调用 handleDataDownPayloads() 前,run已经调用了 mqtthandler.NewHandler()。mqtthandler.NewHandler() 和mqtt建立连接并订阅相关的mqtt话题。 当mqtt downlinkTopic 话题有消息到来时,消息被传递到 DataDownChan() 进而被 G1 处理。
run再调用 startApplicationServerAPI 开启一个Goroutine G2
G2 开启app-server的基于tcp的 grpc 接口。
run 又调用 startGatewayPing 开启一个Goroutine G3
G3 死循环,每隔1秒醒来一次,找出开启发现协议的GateWay, 生成发现协议ping包.,通过grpc 把这些ping包发给这些gateway对应的 ns 。回顾 loraserver 源码解析 (五) loraserver, loraserver的G1 会处理这些ping包,进而转发给gateway
接下来的startJoinServerAPI()开启Goroutine G4
如果配置了tls就创建一个 tls的 http 服务,否则创建不加密的http服务。
G4 通过http协议接收到来自loraserver 的otaa请求,继续处理 JoinReq 和 RejoinReq 类型的请求,其余的请求报错。具体的处理交给了 lora-app-server/internal/join 包,join处理后生成一个backend.JoinAnsPayload,然后G4 把这个结构通过http返回给loraserver的G2G。
JoinReq
回顾 loraserver 源码解析 (五) loraserver 中的startLoRaServer, loraserver的 G2G 会根据来自于 Bridge报文的类型进行处理,如果是lorawan.JoinRequest 则执行 join.Handle(rxPacket), 注意loraserver 和 app-server都有各自的join包, 这里的 join 包是 loraserver的。在 loraserver的join包中,会调用 jsClient.JoinReq(),此函数把 报文 转换为 JoinReqPayload 结构,经http发送给lora-app-server 的G4。
JoinReqPayload 是 lorawan源码中的包。lora相关的各种结构体及其数据解析都被集中写到了lorawan工程中,lora-app-server 和 loraserver 以及 lora-gateway-bridge都通过加入lorawan 源码而引入lora相关结构体。loranwan就像第三方库一样被加到了app,bridge,loraserver 3个程序中。
// JoinReqPayload defines the JoinReq message payload.
type JoinReqPayload struct {
BasePayload
MACVersion string `json:"MACVersion"` // e.g. "1.0.2"
PHYPayload HEXBytes `json:"PHYPayload"`
DevEUI lorawan.EUI64 `json:"DevEUI"`
DevAddr lorawan.DevAddr `json:"DevAddr"`
DLSettings lorawan.DLSettings `json:"DLSettings"`
RxDelay int `json:"RxDelay"`
CFList HEXBytes `json:"CFList,omitempty"` // Optional
}
lora-app-server/internal/join 包处理 joinRequest 过程如下
var joinTasks = []func(*context) error{
setJoinContext, //从joinReqPayload.PHYPayload中 解析出netID,joinEUI,devEUI,devNone 等值
getDevice, //根据前面解析出的 devEUI 从PostgreSQL获取设备信息
getApplication, //根据设备信息的ApplicationID,从PostgreSQL获取该设备的application信息
getDeviceKeys, //根据devEUI 从PostgreSQL获取 DeviceKeys(NwkKey,AppKey,JoinNonce)
validateMIC, //用NwkKey对phyPayload进行mic校验
setJoinNonce, //JoinNonce++,并存回 PostgreSQL, 这里遵照了1.1版本的协议文档
setSessionKeys, //由前面获取的信息生成 fNwkSIntKey appSKey sNwkSIntKey nwkSEncKey
createDeviceActivationRecord, //PostgreSQL的device_activation表中插入一条 DeviceActivation 数据,主要包含了 DevEUI 及其相关的keys值
sendJoinNotification,
createJoinAnsPayload, // 生成lorawan.PHYPayload,序列化后填充到backend.JoinAnsPayload
}
/*
sendJoinNotification
打包出JoinNotification 发出 otaa 加入 通知
pl := handler.JoinNotification{
ApplicationID: ctx.device.ApplicationID,
ApplicationName: ctx.application.Name,
DeviceName: ctx.device.Name,
DevEUI: ctx.device.DevEUI,
DevAddr: ctx.joinReqPayload.DevAddr,
}
*/
注意joinTasks任务的sendJoinNotification
app-server实现了观察者模式的通知,把 入网完毕请求通知给所有的观察者。
这些观察者被保存在了PostgreSQL 的 integration 表格中。
每次发送通知的时候,都会先从 PostgreSQL 获取所有的观察者,然后逐个通知
RejoinReq
RejoinReqPayload 和 JoinReqPayload 的内容是一样的
// RejoinReqPayload defines the RejoinReq message payload.
type RejoinReqPayload struct {
BasePayload
MACVersion string `json:"MACVersion"` // e.g. "1.0.2"
PHYPayload HEXBytes `json:"PHYPayload"`
DevEUI lorawan.EUI64 `json:"DevEUI"`
DevAddr lorawan.DevAddr `json:"DevAddr"`
DLSettings lorawan.DLSettings `json:"DLSettings"`
RxDelay int `json:"RxDelay"`
CFList HEXBytes `json:"CFList,omitempty"` // Optional
}
rejoinReq 处理流程如下
var rejoinTasks = []func(*context) error{
setRejoinContext, // 解析出 phyPayload netID joinEUI devEUI devNonce , 解析phyPayload 时,会根据 data[1] 来判断到底是那种类型的rejoinRequst,不同类型的rejoinRequst,存数据的位序是不同的。目前有3种类型,0,2类型的位序是一样的。
getDevice,
getApplication,
getDeviceKeys,
setJoinNonce,
setSessionKeys,
createDeviceActivationRecord,
sendJoinNotification,
createRejoinAnsPayload,
}
rejoinTasks 的函数除setRejoinContext外,其余函数和 joinTasks的代码是一模一样的,请参考前面。
区别在于 rejoinTasks 没有再校验mic
startClientAPI 开启 G5
G5 注册好rpc接口后,可通过https协议对外提供服务。
内部loraserver程序间可通过 rpc 访问这些接口。 对外则可以通过 https json 进行访问。
可参考 grpc 和 restfull 共用一个端口 来帮助理解app-server
对外接口
可以通过 以下三种
gRPC
RESRFul JSON API
MQTT
gRPC
~/go/gopath/src/github.com/brocaar/lora-app-server/api $ ls *.proto
application.proto gateway.proto profiles.proto
common.proto gatewayProfile.proto serviceProfile.proto
device.proto internal.proto user.proto
deviceProfile.proto networkServer.proto
deviceQueue.proto organization.proto
api目录下已经生成了 go 语言的 gRPC 接口源码
接口虽多,绝大部分都是增删改查
登陆
注意 登陆后会拿到 服务器返回的 jwt 串, 后续的RPC调用的context 都必须携带这个 jwt 否则无法通过服务器的授权认证检查。
具体参考下面的代码。
func DialTls(crtFile,address string) (*grpc.ClientConn, error) {
// TLS连接
b, err := ioutil.ReadFile(crtFile)
if err != nil {
log.Fatal(err)
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(b) {
log.Fatal(err)
}
grpcDialOpts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
// given the grpc-gateway is always connecting to localhost, does
// InsecureSkipVerify=true cause any security issues?
InsecureSkipVerify: true,
RootCAs: cp,
}))}
conn, err := grpc.Dial(address, grpcDialOpts...)
return conn,err
}
func Login(name, password string) error {
conn, err := DialTls("server.crt", "127.0.0.1:8080")
client = pb.NewInternalServiceClient(conn)
req := pb.LoginRequest{Username:name, Password:password}
res,err := client.Login(context.Background(), &req)
if err != nil {
log.Fatalln(err)
}
l.Jwt = res.Jwt
md := metadata.Pairs("authorization", "Bearer "+l.Jwt)
ctx := metadata.NewOutgoingContext(context.Background(), md)
return err
}
MQTT
下面的代码可以侦听 设备的join数据
具体有哪些主题,请参考
https://www.loraserver.io/lora-app-server/integrate/sending-receiving/mqtt/
package main
import (
"time"
"sync"
"encoding/json"
log "github.com/sirupsen/logrus"
"github.com/eclipse/paho.mqtt.golang"
)
const (
uplink = "application/5/device/+/join" // 注意 这里的5是 appID, 请确保这个值是你电脑上正确的 application ID, 如果想侦听 报文消息就把 join 改成 rx
broker = "tcp://localhost:1883"
)
type MQTTHandler struct {
conn mqtt.Client
wg sync.WaitGroup
}
func NewHandler() *MQTTHandler {
h := MQTTHandler{}
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
h.conn = mqtt.NewClient(opts)
for {
if token := h.conn.Connect(); token.Wait() && token.Error() != nil {
log.Errorf("handler/mqtt: connecting to broker error, will retry in 2s: %s", token.Error())
time.Sleep(2 * time.Second)
} else {
break
}
}
return &h
}
func OnRecv(c mqtt.Client, msg mqtt.Message) {
var info Person
err := json.Unmarshal(msg.Payload(), &info)
if err != nil {
log.Error(err)
return
}
log.Info(msg)
}
func main() {
h := NewHandler()
h.conn.Subscribe(uplink, 0, OnRecv)
for {
}
}
相关源码解析
在 lora-app-server 源码的
root_run.go 中初始化了 mqtt 接口的相关代码
func setHandler() error {
h, err := mqtthandler.NewHandler(
config.C.Redis.Pool,
config.C.ApplicationServer.Integration.MQTT,
)
if err != nil {
return errors.Wrap(err, "setup mqtt handler error")
}
config.C.ApplicationServer.Integration.Handler = multihandler.NewHandler(h)
return nil
}
mqtthandler 是 默认的 mqtt消息发布者
参考本文前面JoinReq的描述,loraserver 会把报文通过 grpc传递给 app-server, app-server 处理的过程中会调用下面这个函数
func sendJoinNotification(ctx *context) error {
pl := handler.JoinNotification{
ApplicationID: ctx.device.ApplicationID,
ApplicationName: ctx.application.Name,
DeviceName: ctx.device.Name,
DevEUI: ctx.device.DevEUI,
DevAddr: ctx.joinReqPayload.DevAddr,
}
err := eventlog.LogEventForDevice(ctx.device.DevEUI, eventlog.EventLog{
Type: eventlog.Join,
Payload: pl,
})
if err != nil {
log.WithError(err).Error("log event for device error")
}
err = config.C.ApplicationServer.Integration.Handler.SendJoinNotification(pl)
if err != nil {
return errors.Wrap(err, "send join notification error")
}
return nil
}
这个函数最终调用了 config.C.ApplicationServer.Integration.Handler.SendJoinNotification(pl)
这个 config.C.ApplicationServer.Integration.Handler 就是早先 multihandler.NewHandler(h) 创建的
mqtthandler 是默认handler,multihandler还会去数据库拿到更多加在 application 的 Integrations 页面的handlers
所有hanlders的 SendJoinNotification()都被调用。
mqtthandler的SendJoinNotification()函数把 报文发布给了 mqtt 主题, 我们的客户端代码通过这个主题来接收消息,知道有设备otaa上来了
更多推荐
所有评论(0)