这是三部分博客系列的第 3 部分,我们将研究将我的旧Kenwood KR-V7080 接收器连接到互联网并使用Siri Shortcuts控制它。在这篇文章中,我们将:

  • 将 KR-V7080 接收器连接到 Azure IoT Hub

  • 使用Azure函数将KR-V7080接收器暴露在万维网中

  • 设置 Siri 快捷方式以使用语音命令控制 KR-V7080 接收器

如果您觉得需要一些上下文来加快速度,请查看我在该系列中以前的帖子:

  • 第 1 部分:使用 Arduino Uno 的 IR 模块简介

  • 第 2 部分:使用 HM-10 模块的蓝牙通信

但是为什么呢?

  1. 好的,我知道你们中的一些人想知道为什么现在有人想使用这个旧东西,更不用说试图通过手机控制它了。好吧,你说得对,这是一件古老的事情,我不会为此得到超过 100 美元。所以我认为揭开盖子看看如何将它连接到互联网并用我们的声音控制它会是一个更有趣的挑战。

  2. 为什么是语音指令?所以前几天我在看Ad Astra,这是一部设定在不远的未来的科幻电影。好吧,有一个场景,布拉德皮特正在通过人工智能个人助理给他的妻子写一封电子邮件/消息,我想用语音命令控制房子周围的东西是多么酷。最初我想使用Amazon Alexa,但后来我偶然发现了 Siri Shortcuts ,这似乎是一条更简单的路径,因为我已经有了 Apple 设备。

将 KR-V7080 接收器连接到 Azure IoT Hub

从高层次的角度来看,我们在上一篇博文结束时得到的内容如下所示。

[使用 HM-10 模块的蓝牙通信](https://res.cloudinary.com/practicaldev/image/fetch/s--XqcVV-b0--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/ https://dev-to-uploads.s3.amazonaws.com/uploads/articles/cfkcuvyo9l86tnxotm8g.png)

在本节中,我们想要达到的内容如下所示。

[从 Azure IoT 中心与 KR-V7080 通信](https://res.cloudinary.com/practicaldev/image/fetch/s--6lxXHWQC--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/ https://dev-to-uploads.s3.amazonaws.com/uploads/articles/th6eud0hnlt8196bum7i.png)

将树莓派集成到解决方案中

在 Raspberry Pi 和 Arduino 设备之间建立 BLE 数据连接是一个相当简单的过程,因为我们已经在Raspberry Pi,见 Arduino 中介绍了这一点。 Arduino,见见树莓派。让我们谈谈蓝牙 (LE)。让我们创建一个小型 python 应用程序,以便我们可以与我们的 Arduino Uno 进行通信。

KenwoodRemoteAgent.py

   '''
   Description: A helper class to simulate a Kenwood RC-R0803 remote over BlE.
   '''

   import asyncio
   from bleak import BleakScanner
   from bleak.backends.bluezdbus.client import BleakClientBlueZDBus

   class KenwoodRemoteAgent:
      def __init__(self, device_name, data_channel_uuid, logger):
         self.device_name = device_name
         self.data_channel_uuid = data_channel_uuid
         self.logger = logger
         self.device_found = False
         self.device_connected = False
         self.client = None

      async def run(self):
         while not self.device_found:
               device = await BleakScanner.find_device_by_filter(
                  lambda d, ad: d.name and d.name.lower() == self.device_name.lower()
               )

               if device is None:
                  self.logger.info("{} not found".format(self.device_name))
                  await asyncio.sleep(1)
               else:
                  self.logger.info("{} found".format(device))
                  self.device_found = True

         self.client = BleakClientBlueZDBus(device)

         while not self.device_connected:
               try:
                  if await self.client.connect():
                     self.device_connected = True
                     self.logger.info("Connected to {}".format(self.device_name))
               except:
                  self.logger.info("Connected to {} failed".format(self.device_name))

               if not self.device_connected:
                  await asyncio.sleep(1)
                  self.logger.info("Retrying...")

      async def send_command(self, command: str):
         await self.client.write_gatt_char(self.data_channel_uuid, command.encode('UTF-8'))

      async def stop(self):
         if not self.device_connected:
               return

         try:
               if await self.client.disconnect():
                  self.device_connected = False
                  self.logger.info("Disconnected from {}".format(self.device_name))
         except:
               self.logger.info("Disconnected from {} failed".format(self.device_name))

进入全屏模式 退出全屏模式

app.py

   '''
   Description: Simple app to interact with HM-10 BLE module.
   '''
   import asyncio
   import logging
   import signal
   import sys
   from time import gmtime
   from turtle import delay

   from KenwoodRemoteAgent import KenwoodRemoteAgent

   logging.basicConfig(filename='/home/pi/MyHomeAgent/events.log', encoding='utf-8', format='%(asctime)s %(module)-20s %(message)s', level=logging.DEBUG)
   logging.Formatter.converter = gmtime

   device_name = "KR-V7080"
   data_channel_uuid = "0000ffe1-0000-1000-8000-00805f9b34fb"

   async def main():

      kenwood_remote_agent = None
      execution_is_over = asyncio.Future()

      async def abort_handler(signame):
         execution_is_over.set_result("Ctrl+C")
         logging.info("Cleaning up before exiting...")

         if kenwood_remote_agent is not None:
               await kenwood_remote_agent.stop()

      loop = asyncio.get_event_loop()
      for signame in ('SIGINT', 'SIGTERM'):
         loop.add_signal_handler(getattr(signal, signame),
                                 lambda: asyncio.ensure_future(abort_handler(signame)))

      try:
         logging.info("Started...")
         kenwood_remote_agent = KenwoodRemoteAgent(device_name, data_channel_uuid, logging)
         await kenwood_remote_agent.run()

         # Send command to toggle power on/off
         await kenwood_remote_agent.send_command("power#")
         await asyncio.sleep(3)
         # Send an unknown command
         await kenwood_remote_agent.send_command("Blah blah#")

         await execution_is_over
         asyncio.get_event_loop().stop()

      except Exception:
         logging.error("Stopped!")

   if __name__ == "__main__":
      asyncio.get_event_loop().run_until_complete(main())

进入全屏模式 退出全屏模式

让我们回顾一下上面的代码,大致了解它在做什么。app.py中的main()方法基本上创建了一个 KenwoodRemoteAgent 类的实例,其名称为KR-V7080uuid(通用唯一标识符),与 HM-10 模块提供的自定义 BLE 特性相关联。然后调用实例方法run(),它将尝试查找 KR-V7080 设备并连接到它。

连接后,main() 方法将发送命令power#Blah blah#到 HM-10 模块。 HM-10 模块纯粹充当数据通道,它只是将接收到的内容传递给 Arduino Uno,反之亦然。请注意,加载到 Arduino Uno 上的草图定义了一个有效命令表,因此只有有效命令被转发到 IR Tx 模块并传输到 Kenwood KR-V7080 接收器。

[Raspberry Pi 与 HM-10 通信](https://res.cloudinary.com/practicaldev/image/fetch/s--PG1pXvVe--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_880/https:/ /dev-to-uploads.s3.amazonaws.com/uploads/articles/l0g0ntjxk5cveydz5at5.gif)

上面的演示在左侧显示了我们运行app.py程序的终端,而右侧的终端显示了 Arduino Uno 上的活动。

将 Azure IoT Hub 集成到解决方案中

继续我之前关于 Azure IoT Hub 的博客文章,我们将使用从这些博客文章中创建的资源,并将它们集成到此解决方案中,以节省我一些时间。如果您想了解有关 Azure IoT Hub 的更多信息,请随时查看以下内容:

  • Azure IoT 中心入门

  • Azure IoT 设备入门

  • 使用 Azure 函数处理 Azure IoT 中心事件

  • 使用 Python 实现 Azure IoT 设备

  • 使用遥控钥匙寻呼手机

我们将扩展我们的小应用程序以连接到 Azure IoT 中心作为MyHomeAgent,这是我在之前的博客文章中创建的 Azure IoT 设备。

AzureIotDeviceAgent.py

   '''
   Description: A helper class to interact with Azure IoT Hub.
   '''

   import logging
   from pb_Parcel_pb2 import pb_Parcel
   from azure.iot.device.aio import IoTHubDeviceClient
   from google.protobuf import json_format
   from azure.iot.device import MethodResponse

   class AzureIotDeviceAgent:
      def __init__(self, name: str, connection_string: str, logger: logging):
         self.name= name
         self.device_client = IoTHubDeviceClient.create_from_connection_string(connection_string)
         self.device_client.on_method_request_received = self.method_request_handler
         self.logger = logger
         self.clients = []

      async def connect(self):
         if self.device_client.connected:
               return

         await self.device_client.connect()
         self.logger.info("{} connected".format(self.name))

      async def disconnect(self):
         if not self.device_client.connected:
               return

         await self.device_client.disconnect()
         self.logger.info("{} disconnected".format(self.name))

      # Add handler for incoming parcels
      def add_client(self, client):
         self.clients.append(client)

      async def send_parcel(self, parcel: pb_Parcel):
         if not self.device_client.connected:
               self.logger.error("{} not connected".format(self.name))
               return

         # Populate source domain and domain agent
         parcel.source.domain_agent = self.name
         parcel.source.domain = "Device Domain"
         parcel = parcel.SerializeToString()

         # Note that parcel here is serialised to a byte array, not UTF8 string.
         await self.device_client.send_message(parcel)

      async def method_request_handler(self, method_request):
         status_code = 200
         payload = {"result": True, "data": "parcel handled"}

         if method_request.name != "ProcessMessage":
               status_code = 404
               payload = {"result": False, "data": "unknown method request"}

         parcel = json_format.ParseDict(method_request.payload, pb_Parcel(), True)

         if parcel is None:
               status_code = 400
               payload = {"result": False, "data": "no parcel received"}
         else:
               for client in self.clients:
                  if client.name == parcel.destination.name:
                     await client.process_parcel(parcel)
                     return
               status_code = 503
               payload = {"result": False, "data": "no parcel handler"}

         method_response = MethodResponse.create_from_method_request(method_request, status_code, payload)
         await self.device_client.send_method_response(method_response)

进入全屏模式 退出全屏模式

KenwoodRemoteAgent.py

   '''
   Description: A helper class to simulate a Kenwood RC-R0803 remote over BlE.
   '''

   ...

   class KenwoodRemoteAgent:
      def __init__(self, device_name, data_channel_uuid, logger: logging):
         ...

      async def run(self):
         ...

      async def send_command(self, command: str):
         ...

      async def stop(self):
         ...

      async def process_parcel(self, parcel: pb_Parcel):
         if parcel.type == "ASCII":
               await self.send_command(parcel.content)
         else:
               self.logger.warning("Unexpected content type: {}".format(parcel.type))

进入全屏模式 退出全屏模式

pb_Endpoint.proto

   syntax = "proto3";

   message pb_Endpoint {
   string name           = 1;
   string local_id       = 2;
   string domain_agent   = 3;
   string domain         = 4;
   }

进入全屏模式 退出全屏模式

pb_Parcel.proto

   syntax = "proto3";

   import "pb_Endpoint.proto";

   message pb_Parcel {
   pb_Endpoint source      = 1;  // Sender endpoint
   pb_Endpoint destination = 2;  // Recipient endpoint
   string type             = 3;  // Name of parcel content
   string content          = 4;  // UTF-8 encoded of parcel content
   }

进入全屏模式 退出全屏模式

app.py

   '''
   Description: Simple app to interact with HM-10 BLE module.
   '''

   ...

   from AzureIotDeviceAgent import AzureIotDeviceAgent

   my_home_agent_name = "MyHomeAgent"
   my_home_agent_connection_string = "my-home-agent-connection-string"

   async def main():
      ...

      my_home_agent = None

      try:
         ...

         my_home_agent = AzureIotDeviceAgent(my_home_agent_name, my_home_agent_connection_string, logging)
         await my_home_agent.connect()
         my_home_agent.add_client(kenwood_remote_agent)

      except Exception:
         ...

         if my_home_agent is not None:
               my_home_agent.disconnect()

         logging.error("Stopped!")

   if __name__ == "__main__":
      asyncio.get_event_loop().run_until_complete(main())

进入全屏模式 退出全屏模式

现在让我们运行代码。

我们添加了AzureIotDeviceAgent.py,这是一个帮助程序类,用于连接到 IoT 中心以及从中发送和接收数据。

当我们想向 IoT Hub 发送数据时,我们可以调用send_parcel()方法。

当我们要接收数据时,我们通过add_client()方法注册一个客户端。如果传入的数据发往已注册的客户端,则数据将通过客户端的process_parcel()方法传递给该客户端进行处理。

KenwoodRemoteAgent.py已修改为包含process_parcel()方法,因为如果数据通过 Internet 发送到此设备 (KR-V7080),则会调用该方法。process_parcel()所做的只是通过 BLE 将数据转发到 Arduino Uno。

app.py中的main()方法修改为创建AzureIotDeviceAgent实例,与IoT Hub建立连接,注册KenwoodRemoteAgent实例为客户端。

发送到 IoT 中心和从 IoT 中心接收的数据被封装为 pb_Parcel 对象 (parcels),其中包含有关数据来自哪里、需要去哪里以及有效负载中的数据类型的信息。为了帮助我们通过互联网在设备/服务/应用程序之间发送这些包裹,我实施了一个后端解决方案 (Multilinks),其中包含一组无服务器功能,以确保包裹被转发到正确的目的地。我希望在某个时候更详细地介绍 Multilinks,但现在只假设存在后端服务来通过 Internet 处理包裹。

使用 Azure Function 从 Internet 与 KR-V7080 接收器通话

此时,我们仅将 KR-V7080 接收器连接到 IoT Hub。然而,我们并没有办法与 IoT Hub 通信并控制 KR-V7080 接收器。

在本节中,我们将添加一个Azure Function作为webhook与 KR-V7080 Receiver 通信。解决方案将类似于下图所示。

[通过 Webhook 与 KR-V7080 通信](https://res.cloudinary.com/practicaldev/image/fetch/s--5FsR8GLz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https ://dev-to-uploads.s3.amazonaws.com/uploads/articles/lswwjjemqzha4a6mnhg1.png)

正如我之前提到的,我已经实现了Multilinks,这是一个能够在 Internet 上的两个端点之间传递数据的后端服务。唯一的要求是所有端点都需要在 Multilinks 的 Endpoint Registry 中注册,这允许 Multilinks 处理数据的传递,而发送者无需担心数据将如何到达那里。这个概念与我们想要向某人发送包裹的现实世界场景非常相似。首先我们把内容放在一个盒子里,然后标明它需要去哪里以及是谁寄的,然后把它送到邮局。只要它到达那里,我们并不担心它是否会通过陆路、海上或空中到达那里。

我们要创建的 webhook 类似于我们投递包裹的邮局。这个 webhook 的实现如下所示。

ExternalServicesInboundAgent.cs

   using System;
   using System.IO;
   using System.Threading.Tasks;
   using Microsoft.AspNetCore.Mvc;
   using Microsoft.Azure.WebJobs;
   using Microsoft.Azure.WebJobs.Extensions.Http;
   using Microsoft.AspNetCore.Http;
   using Microsoft.Extensions.Logging;
   using Newtonsoft.Json;
   using Microsoft.Azure.WebJobs.Extensions.DurableTask;

   namespace Multilinks.InboundAgents
   {
      public class ExternalServicesInboundAgent
      {
         [FunctionName("ExternalServicesInboundAgent")]
         public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
         {
            log.LogInformation("[ExternalServicesInboundAgent] Parcel received");

            string requestBody = String.Empty;

            using (StreamReader streamReader = new StreamReader(req.Body))
            {
               requestBody = await streamReader.ReadToEndAsync();
            }

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            string source = data?.source;
            string destination = data?.destination;
            string type = data?.type;
            string content = data?.content;

            if (string.IsNullOrEmpty(source) ||
               string.IsNullOrEmpty(destination) ||
               string.IsNullOrEmpty(type) ||
               string.IsNullOrEmpty(content))
            {
               log.LogError("[ExternalServicesInboundAgent] Invalid parcel");
               return new BadRequestObjectResult("Parcel paramters Source, Destination, Type and Content are all required.");
            }

            pb_Parcel parcel = new pb_Parcel();
            parcel.Source = new pb_Endpoint();
            parcel.Destination = new pb_Endpoint();

            parcel.Source.Name = source;
            parcel.Destination.Name = destination;
            parcel.Type = type;
            parcel.Content = content;

            /*
               Let's start the orchestration.
            */
            var instanceId = await starter.StartNewAsync("MultilinksOrchestrator", parcel);

            log.LogInformation("[ExternalServicesInboundAgent] Parcel sent");
            return new OkObjectResult("Parcel sent");
         }
      }
   }

进入全屏模式 退出全屏模式

让我们回顾一下上面的代码,大致了解发生了什么。

从函数头声明开始,我们看到它需要 3 个参数:

  • [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req

  • [DurableClient] IDurableOrchestrationClient starter

  • ILogger log

第一个是触发该函数执行的原因,在本例中是一个 HTTP POST 请求。AuthorizationLevel.Function表示此函数受 SAS 密钥保护(查看Function access keys部分下的Azure Functions HTTP 触发器)。

第二个是DurableClient,用于指定处理请求的Durable Function Orchestrator

最后是用于记录目的的记录服务。

我们希望 POST 请求具有包含以下字段的 json 类型的有效负载:

  • 来源

  • 目的地

  • 内容

如果缺少其中任何一个,请使用bad request状态响应向发件人指示。否则,使用提供的数据创建一个包裹并将其传递给函数协调器来处理,即MultilinksOrchestrator

好吧,让我们试试这个新的 webhook。我们将使用Postman发送 POST 请求。理论上,请求将按照上面的顺序图传播,并按请求作用于 KR-V7080 接收器。

[Webhook 请求演示](https://res.cloudinary.com/practicaldev/image/fetch/s--tyc2uRMX--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_66%2Cw_880/https://dev-to -uploads.s3.amazonaws.com/uploads/articles/cxbx6fwwcgh3u82tskte.gif)

在上面的演示中,我们发布了 3 个 HTTP 请求。

  • 首先,我们发送了一个没有任何负载的 POST 请求,正如预期的那样,我们得到了一个错误的请求响应。

  • 第二次,我们发送了一个带有有效载荷的 POST 请求。但是,寻址端点尚未在endpoint registry中注册(加上未寻址到 KR-V7080),因此该请求从未转发到 Arduino Uno,我们在 Arduino Uno 的串行控制台中看不到任何输出。

  • 最后,我们发送了一个带有有效负载的 POST 请求,并发送至 KR-V7080。该请求被转发到 Arduino Uno,我们在 Arduino Uno 的串行控制台中看到它已收到请求并将适当的 IR 命令发送到 KR-V7080 接收器。

设置 Siri 快捷方式以使用语音命令控制 KR-V7080 接收器

在本节中,我们将通过语音命令控制 KR-V7080 接收器来扩展上一节。解决方案将类似于下图所示。

[通过语音命令与 KR-V7080 通信](https://res.cloudinary.com/practicaldev/image/fetch/s--7VYKqw1S--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https ://dev-to-uploads.s3.amazonaws.com/uploads/articles/soa7noihszuhalm0lz40.png)

如本文开头所述,我们将在 Apple Watch 上使用Siri Shortcuts来触发语音命令。具体来说,当我们说Stereo OnStereo Off时,我们希望使用 Siri Shortcuts 发送 HTTP POST 请求。我将总结我们需要为我们的场景做些什么,但如果您想了解更多信息,请务必查看上面的 Siri Shortcuts 链接。

[Siri 快捷方式](https://res.cloudinary.com/practicaldev/image/fetch/s--feLGoESz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to- uploads.s3.amazonaws.com/uploads/articles/67qymp373ayo8yhezztu.png)

在上图中,创建了两个快捷方式。一种用于要求 Siri 转动stereo on时,另一种用于要求 Siri 转动stereo off时。

[Siri 快捷方式详情](https://res.cloudinary.com/practicaldev/image/fetch/s--dMuSx4Wz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to -uploads.s3.amazonaws.com/uploads/articles/ivh3k505e4g2cs4c9yk4.png)

在上图中,我们看到了stereo on快捷方式的详细信息:

  • 子标题Hey Siri, Stereo On是触发此快捷方式的语音命令

  • 请求将被发送到https://webhookaddress.com?code=access-code-goes-here

  • 请求将是 HTTPPOST

  • 有效负载将是一个包含parcel数据的JSON对象

下面是运行中的快捷方式的演示。我们拥有的是 Arduino Uno、Apple Watch 和 Kenwood 音响。 Apple Watch 用于发出语音命令,Siri 处理这些命令并将其发送到 Internet,然后转发到 Arduino Uno,后者可以打开和关闭立体声音响。

最后的想法

虽然我们实现了我们打算做的事情,但我对看到的延迟不太满意(但我想这是意料之中的,因为该解决方案涉及如此多的网络绑定任务)。例如:

  • Siri 处理语音命令并将其转换为 HTTP 请求

  • 无服务器功能可能需要冷启动

  • 处理从 IoT Hub 发送到 Raspberry Pi 的请求。

在最坏的情况下,完成一个请求大约需要 10-15 秒。但一般来说,它需要大约一半的时间。虽然由于 Siri 我们不能对延迟做太多,但我认为我们肯定可以改善其他两个的延迟,但我想我会再等一天。

这篇博文最初发布在我的博客网站An IoT Odyssey

Logo

云原生社区为您提供最前沿的新闻资讯和知识内容

更多推荐